From 7f1763a2684b1ace866bc1714cf8e6b9297475fb Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Tue, 4 Jun 2024 13:54:09 -0400 Subject: [PATCH 1/2] Add validate method to PendingUpdate --- .../org/apache/iceberg/PendingUpdate.java | 14 + .../java/org/apache/iceberg/Validation.java | 55 + .../apache/iceberg/BaseReplaceSortOrder.java | 10 + .../iceberg/BaseUpdatePartitionSpec.java | 9 + .../org/apache/iceberg/PropertiesUpdate.java | 10 + .../org/apache/iceberg/RemoveSnapshots.java | 8 + .../java/org/apache/iceberg/SchemaUpdate.java | 8 + .../java/org/apache/iceberg/SetLocation.java | 14 +- .../apache/iceberg/SetSnapshotOperation.java | 10 + .../org/apache/iceberg/SetStatistics.java | 8 + .../org/apache/iceberg/SnapshotManager.java | 11 + .../org/apache/iceberg/SnapshotProducer.java | 10 + .../apache/iceberg/StaticTableOperations.java | 7 + .../UpdateSnapshotReferencesOperation.java | 10 + .../org/apache/iceberg/ValidationUtils.java | 30 + .../apache/iceberg/TestCustomValidations.java | 1520 +++++++++++++++++ .../org/apache/iceberg/TestTransaction.java | 97 ++ 17 files changed, 1830 insertions(+), 1 deletion(-) create mode 100644 api/src/main/java/org/apache/iceberg/Validation.java create mode 100644 core/src/main/java/org/apache/iceberg/ValidationUtils.java create mode 100644 core/src/test/java/org/apache/iceberg/TestCustomValidations.java diff --git a/api/src/main/java/org/apache/iceberg/PendingUpdate.java b/api/src/main/java/org/apache/iceberg/PendingUpdate.java index f47b98238de0..900d095b1c1d 100644 --- a/api/src/main/java/org/apache/iceberg/PendingUpdate.java +++ b/api/src/main/java/org/apache/iceberg/PendingUpdate.java @@ -21,6 +21,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; +import java.util.List; /** * API for table metadata changes. @@ -40,6 +41,19 @@ public interface PendingUpdate { */ T apply(); + /** + * Validate the current version of the table. + * + * @param validations A list of {@link Validation} which will be used to test whether it is safe + * to commit the pending changes to the current version of the table at commit time. + * @throws ValidationException If the update cannot be applied to the current table metadata. + * @throws UnsupportedOperationException If any of the supplied validations attempt to modify the + * table. + */ + default void validate(List validations) { + throw new UnsupportedOperationException(); + } + /** * Apply the pending changes and commit. * diff --git a/api/src/main/java/org/apache/iceberg/Validation.java b/api/src/main/java/org/apache/iceberg/Validation.java new file mode 100644 index 000000000000..247cc98f7091 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/Validation.java @@ -0,0 +1,55 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ +package org.apache.iceberg; + +import com.google.errorprone.annotations.FormatMethod; +import java.util.function.Predicate; +import org.apache.iceberg.exceptions.ValidationException; + +public class Validation { + private final Predicate predicate; + private final String message; + private final Object[] args; + + /** + * @param predicate The predicate the table needs to satisfy. + * @param message The message that will be included in the {@link ValidationException} that will + * be thrown by {@link Validation#validate} if the predicate is not satisfied. + * @param args The arguments referenced by the format specifiers in the message, if any. + */ + @FormatMethod + public Validation(Predicate
predicate, String message, Object... args) { + this.predicate = predicate; + this.message = message; + this.args = args; + } + + /** + * Ensures that the given table is valid according to the predicate. + * + * @param table The table to test. + * @throws ValidationException If the predicate is not satisfied by the given table. + */ + @SuppressWarnings("FormatStringAnnotation") + public void validate(Table table) { + ValidationException.check(predicate.test(table), message, args); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java index 2311c1b017d9..0934d24323e2 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java @@ -27,14 +27,17 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; +import java.util.List; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.Tasks; public class BaseReplaceSortOrder implements ReplaceSortOrder { private final TableOperations ops; private final SortOrder.Builder builder; private TableMetadata base; + private final List pendingValidations = Lists.newArrayList(); BaseReplaceSortOrder(TableOperations ops) { this.ops = ops; @@ -47,6 +50,12 @@ public SortOrder apply() { return builder.build(); } + @Override + public void validate(List validations) { + ValidationUtils.validate(base, validations); + pendingValidations.addAll(validations); + } + @Override public void commit() { Tasks.foreach(ops) @@ -62,6 +71,7 @@ public void commit() { this.base = ops.refresh(); SortOrder newOrder = apply(); TableMetadata updated = base.replaceSortOrder(newOrder); + ValidationUtils.validate(base, pendingValidations); taskOps.commit(base, updated); }); } diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index c69f6f3844f9..f9d784dc1902 100644 --- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -62,6 +62,8 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec { private boolean setAsDefault; private int lastAssignedPartitionId; + private final List pendingValidations = Lists.newArrayList(); + BaseUpdatePartitionSpec(TableOperations ops) { this.ops = ops; this.caseSensitive = true; @@ -334,6 +336,12 @@ public PartitionSpec apply() { return builder.build(); } + @Override + public void validate(List validations) { + ValidationUtils.validate(base, validations); + pendingValidations.addAll(validations); + } + @Override public void commit() { TableMetadata update; @@ -342,6 +350,7 @@ public void commit() { } else { update = base.addPartitionSpec(apply()); } + ValidationUtils.validate(base, pendingValidations); ops.commit(base, update); } diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 35338a689205..0ec3aeb808e7 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -27,10 +27,12 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Tasks; @@ -40,6 +42,7 @@ class PropertiesUpdate implements UpdateProperties { private final Map updates = Maps.newHashMap(); private final Set removals = Sets.newHashSet(); private TableMetadata base; + private final List pendingValidations = Lists.newArrayList(); PropertiesUpdate(TableOperations ops) { this.ops = ops; @@ -96,6 +99,12 @@ public Map apply() { return newProperties; } + @Override + public void validate(List validations) { + ValidationUtils.validate(base, validations); + pendingValidations.addAll(validations); + } + @Override public void commit() { Tasks.foreach(ops) @@ -110,6 +119,7 @@ public void commit() { taskOps -> { Map newProperties = apply(); TableMetadata updated = base.replaceProperties(newProperties); + ValidationUtils.validate(base, pendingValidations); taskOps.commit(base, updated); }); } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 7558ea7d8a3e..4a11829d20a2 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -85,6 +85,7 @@ public void accept(String file) { private ExecutorService planExecutorService = ThreadPools.getWorkerPool(); private Boolean incrementalCleanup; private boolean specifiedSnapshotId = false; + private final List pendingValidations = Lists.newArrayList(); RemoveSnapshots(TableOperations ops) { this.ops = ops; @@ -293,6 +294,12 @@ private Set unreferencedSnapshotsToRetain(Collection refs) { return snapshotsToRetain; } + @Override + public void validate(List validations) { + ValidationUtils.validate(base, validations); + pendingValidations.addAll(validations); + } + @Override public void commit() { Tasks.foreach(ops) @@ -306,6 +313,7 @@ public void commit() { .run( item -> { TableMetadata updated = internalApply(); + ValidationUtils.validate(base, pendingValidations); ops.commit(base, updated); }); LOG.info("Committed snapshot changes"); diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index 069097778606..78b723286988 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -64,6 +64,7 @@ class SchemaUpdate implements UpdateSchema { private boolean allowIncompatibleChanges = false; private Set identifierFieldNames; private boolean caseSensitive = true; + private final List pendingValidations = Lists.newArrayList(); SchemaUpdate(TableOperations ops) { this(ops, ops.current()); @@ -442,9 +443,16 @@ public Schema apply() { return newSchema; } + @Override + public void validate(List validations) { + ValidationUtils.validate(base, validations); + pendingValidations.addAll(validations); + } + @Override public void commit() { TableMetadata update = applyChangesToMetadata(base.updateSchema(apply(), lastColumnId)); + ValidationUtils.validate(base, pendingValidations); ops.commit(base, update); } diff --git a/core/src/main/java/org/apache/iceberg/SetLocation.java b/core/src/main/java/org/apache/iceberg/SetLocation.java index 148e4b8bc8be..78a208eca127 100644 --- a/core/src/main/java/org/apache/iceberg/SetLocation.java +++ b/core/src/main/java/org/apache/iceberg/SetLocation.java @@ -27,12 +27,15 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; +import java.util.List; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.Tasks; public class SetLocation implements UpdateLocation { private final TableOperations ops; private String newLocation; + private final List pendingValidations = Lists.newArrayList(); public SetLocation(TableOperations ops) { this.ops = ops; @@ -50,6 +53,11 @@ public String apply() { return newLocation; } + @Override + public void validate(List validations) { + pendingValidations.addAll(validations); + } + @Override public void commit() { TableMetadata base = ops.refresh(); @@ -61,6 +69,10 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .run(taskOps -> taskOps.commit(base, base.updateLocation(newLocation))); + .run( + taskOps -> { + ValidationUtils.validate(base, pendingValidations); + taskOps.commit(base, base.updateLocation(newLocation)); + }); } } diff --git a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java index 0f80b4e1f233..3772f9f7de85 100644 --- a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java +++ b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java @@ -31,6 +31,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; @@ -46,6 +47,7 @@ class SetSnapshotOperation implements PendingUpdate { private TableMetadata base; private Long targetSnapshotId = null; private boolean isRollback = false; + private final List pendingValidations = Lists.newArrayList(); SetSnapshotOperation(TableOperations ops) { this.ops = ops; @@ -105,6 +107,12 @@ public Snapshot apply() { return base.snapshot(targetSnapshotId); } + @Override + public void validate(List validations) { + ValidationUtils.validate(base, validations); + pendingValidations.addAll(validations); + } + @Override public void commit() { Tasks.foreach(ops) @@ -123,6 +131,8 @@ public void commit() { .setBranchSnapshot(snapshot.snapshotId(), SnapshotRef.MAIN_BRANCH) .build(); + ValidationUtils.validate(base, pendingValidations); + // Do commit this operation even if the metadata has not changed, as we need to // advance the hasLastOpCommited for the transaction's commit to work properly. // (Without any other operations in the transaction, the commitTransaction() call diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index 41c7254d6cdc..349bf2d5c539 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -22,11 +22,13 @@ import java.util.Map; import java.util.Optional; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; public class SetStatistics implements UpdateStatistics { private final TableOperations ops; private final Map> statisticsToSet = Maps.newHashMap(); + private final List pendingValidations = Lists.newArrayList(); public SetStatistics(TableOperations ops) { this.ops = ops; @@ -50,10 +52,16 @@ public List apply() { return internalApply(ops.current()).statisticsFiles(); } + @Override + public void validate(List validations) { + pendingValidations.addAll(validations); + } + @Override public void commit() { TableMetadata base = ops.current(); TableMetadata newMetadata = internalApply(base); + ValidationUtils.validate(base, pendingValidations); ops.commit(base, newMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java index bb7ca4b11c11..c27ee9957573 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg; +import java.util.List; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class SnapshotManager implements ManageSnapshots { @@ -174,6 +175,16 @@ public Snapshot apply() { return transaction.table().currentSnapshot(); } + @Override + public void validate(List validations) { + commitIfRefUpdatesExist(); + + // Add a no-op UpdateProperties to add given validations to transaction + UpdateProperties updateProperties = transaction.updateProperties(); + updateProperties.validate(validations); + updateProperties.commit(); + } + @Override public void commit() { commitIfRefUpdatesExist(); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index f750e88e86d9..fee43a183fea 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -114,6 +114,8 @@ public void accept(String file) { private String targetBranch = SnapshotRef.MAIN_BRANCH; private CommitMetrics commitMetrics; + private final List pendingValidations = Lists.newArrayList(); + protected SnapshotProducer(TableOperations ops) { this.ops = ops; this.strictCleanup = ops.requireStrictCleanup(); @@ -374,6 +376,12 @@ protected TableMetadata refresh() { return base; } + @Override + public void validate(List validations) { + ValidationUtils.validate(base, validations); + pendingValidations.addAll(validations); + } + @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") public void commit() { @@ -413,6 +421,8 @@ public void commit() { return; } + ValidationUtils.validate(base, pendingValidations); + // if the table UUID is missing, add it here. the UUID will be re-created each // time // this operation retries diff --git a/core/src/main/java/org/apache/iceberg/StaticTableOperations.java b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java index 77ee0920edc9..2595e2f92ee1 100644 --- a/core/src/main/java/org/apache/iceberg/StaticTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java @@ -37,6 +37,13 @@ public StaticTableOperations(String metadataFileLocation, FileIO io) { this(metadataFileLocation, io, null); } + public StaticTableOperations(TableMetadata metadata) { + this.metadataFileLocation = metadata.metadataFileLocation(); + this.staticMetadata = metadata; + this.io = null; + this.locationProvider = null; + } + public StaticTableOperations( String metadataFileLocation, FileIO io, LocationProvider locationProvider) { this.io = io; diff --git a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java index f7ccea747a60..564451163fcb 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java +++ b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java @@ -18,8 +18,10 @@ */ package org.apache.iceberg; +import java.util.List; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SnapshotUtil; @@ -32,6 +34,7 @@ class UpdateSnapshotReferencesOperation implements PendingUpdate updatedRefs; private final TableMetadata base; + private final List pendingValidations = Lists.newArrayList(); UpdateSnapshotReferencesOperation(TableOperations ops) { this.ops = ops; @@ -44,9 +47,16 @@ public Map apply() { return updatedRefs; } + @Override + public void validate(List validations) { + ValidationUtils.validate(base, validations); + pendingValidations.addAll(validations); + } + @Override public void commit() { TableMetadata updated = internalApply(); + ValidationUtils.validate(base, pendingValidations); ops.commit(base, updated); } diff --git a/core/src/main/java/org/apache/iceberg/ValidationUtils.java b/core/src/main/java/org/apache/iceberg/ValidationUtils.java new file mode 100644 index 000000000000..86a9e6da3bcb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ValidationUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; + +class ValidationUtils { + private ValidationUtils() {} + + static void validate(TableMetadata base, List validations) { + Table currentTable = new BaseTable(new StaticTableOperations(base), null); + validations.forEach(validation -> validation.validate(currentTable)); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestCustomValidations.java b/core/src/test/java/org/apache/iceberg/TestCustomValidations.java new file mode 100644 index 000000000000..771d4f16c635 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestCustomValidations.java @@ -0,0 +1,1520 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.util.Objects; +import java.util.Set; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestCustomValidations extends V2TableTestBase { + + private final Validation alwaysPassValidation = + new Validation(currentTable -> true, "Always pass."); + + private final String alwaysFailMessage = "Always fail."; + private final Validation alwaysFailValidation = + new Validation(currentTable -> false, alwaysFailMessage); + + private final String watermarkKey = "watermark"; + + private void setWatermarkProperty(Table table, int watermarkValue) { + table.updateProperties().set(watermarkKey, Integer.toString(watermarkValue)).commit(); + } + + private final String watermarkFailMessagePattern = + "Current watermark value not equal to expected value=%s"; + + private Validation watermarkValidation(int expectedValue) { + return new Validation( + currentTable -> + Objects.equals( + currentTable.properties().get(watermarkKey), Integer.toString(expectedValue)), + watermarkFailMessagePattern, + expectedValue); + } + + private final Validation illegalValidation = + new Validation( + currentTable -> { + // illegal table modification inside validation predicate + currentTable.updateProperties().set(watermarkKey, Integer.toString(0)).commit(); + return true; + }, + "Predicate returned false."); + + @TestTemplate + public void testCherryPickPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).stageOnly().commit(); + long overwriteSnapshotId = + Streams.stream(table.snapshots()) + .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation())) + .findFirst() + .get() + .snapshotId(); + validateTableFiles(table, FILE_A); + + CherryPickOperation cherrypick = + new CherryPickOperation(table.name(), table.operations()).cherrypick(overwriteSnapshotId); + cherrypick.validate(ImmutableList.of(alwaysPassValidation)); + cherrypick.commit(); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(overwriteSnapshotId); + validateTableFiles(table, FILE_B); + } + + @TestTemplate + public void testCherryPickFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).stageOnly().commit(); + long overwriteSnapshotId = + Streams.stream(table.snapshots()) + .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation())) + .findFirst() + .get() + .snapshotId(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> { + CherryPickOperation cherrypick = + new CherryPickOperation(table.name(), table.operations()) + .cherrypick(overwriteSnapshotId); + cherrypick.validate(ImmutableList.of(alwaysFailValidation)); + cherrypick.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(firstSnapshotId).isEqualTo(table.currentSnapshot().snapshotId()); + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testCherryPickFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).stageOnly().commit(); + long overwriteSnapshotId = + Streams.stream(table.snapshots()) + .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation())) + .findFirst() + .get() + .snapshotId(); + validateTableFiles(table, FILE_A); + + setWatermarkProperty(table, 0); + + CherryPickOperation pendingUpdate = + new CherryPickOperation(table.name(), table.operations()).cherrypick(overwriteSnapshotId); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(firstSnapshotId).isEqualTo(table.currentSnapshot().snapshotId()); + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testCherryPickFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).stageOnly().commit(); + long overwriteSnapshotId = + Streams.stream(table.snapshots()) + .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation())) + .findFirst() + .get() + .snapshotId(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> { + CherryPickOperation cherrypick = + new CherryPickOperation(table.name(), table.operations()) + .cherrypick(overwriteSnapshotId); + cherrypick.validate(ImmutableList.of(illegalValidation)); + cherrypick.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(firstSnapshotId).isEqualTo(table.currentSnapshot().snapshotId()); + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testDeleteFilesPassesValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + DeleteFiles deleteFiles = table.newDelete().deleteFile(FILE_A); + deleteFiles.validate(ImmutableList.of(alwaysPassValidation)); + deleteFiles.commit(); + + validateTableFiles(table); + } + + @TestTemplate + public void testDeleteFilesFailsValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> { + DeleteFiles deleteFiles = table.newDelete().deleteFile(FILE_A); + deleteFiles.validate(ImmutableList.of(alwaysFailValidation)); + deleteFiles.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testDeleteFilesFailsValidationDueToConcurrentCommit() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + setWatermarkProperty(table, 0); + + PendingUpdate pendingUpdate = table.newDelete().deleteFile(FILE_A); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testDeleteFilesFailsDueToIllegalTableModificationInsideValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> { + DeleteFiles deleteFiles = table.newDelete().deleteFile(FILE_A); + deleteFiles.validate(ImmutableList.of(illegalValidation)); + deleteFiles.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testExpireSnapshotsPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + Set deletedFiles = Sets.newHashSet(); + + ExpireSnapshots expireSnapshots = + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add); + expireSnapshots.validate(ImmutableList.of(alwaysPassValidation)); + expireSnapshots.commit(); + + assertThat(deletedFiles) + .as("Should remove the expired manifest list location") + .containsExactly(firstSnapshot.manifestListLocation()); + } + + @TestTemplate + public void testExpireSnapshotsFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + Set deletedFiles = Sets.newHashSet(); + + assertThatThrownBy( + () -> { + ExpireSnapshots expireSnapshots = + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add); + expireSnapshots.validate(ImmutableList.of(alwaysFailValidation)); + expireSnapshots.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(deletedFiles).isEmpty(); + } + + @TestTemplate + public void testExpireSnapshotsFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + Set deletedFiles = Sets.newHashSet(); + + setWatermarkProperty(table, 0); + + PendingUpdate pendingUpdate = + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(deletedFiles).isEmpty(); + } + + @TestTemplate + public void testExpireSnapshotsFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + Set deletedFiles = Sets.newHashSet(); + + assertThatThrownBy( + () -> { + ExpireSnapshots expireSnapshots = + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add); + expireSnapshots.validate(ImmutableList.of(illegalValidation)); + expireSnapshots.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(deletedFiles).isEmpty(); + } + + @TestTemplate + public void testFastAppendPassesValidation() { + validateTableFiles(table); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_A); + appendFiles.validate(ImmutableList.of(alwaysPassValidation)); + appendFiles.commit(); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testFastAppendFailsValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> { + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_A); + appendFiles.validate(ImmutableList.of(alwaysFailValidation)); + appendFiles.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table); + } + + @TestTemplate + public void testFastAppendFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + setWatermarkProperty(table, 0); + + PendingUpdate pendingUpdate = table.newFastAppend().appendFile(FILE_A); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table); + } + + @TestTemplate + public void testFastAppendFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> { + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_A); + appendFiles.validate(ImmutableList.of(illegalValidation)); + appendFiles.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } + + @TestTemplate + public void testManageSnapshotsPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "tag1"; + assertThat(table.refs().get(tagName)).isNull(); + + ManageSnapshots manageSnapshots = table.manageSnapshots().createTag(tagName, snapshotId); + manageSnapshots.validate(ImmutableList.of(alwaysPassValidation)); + manageSnapshots.commit(); + + assertThat(table.refs().get(tagName)) + .isNotNull() + .isEqualTo(SnapshotRef.tagBuilder(snapshotId).build()); + } + + @TestTemplate + public void testManageSnapshotsFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "tag1"; + assertThat(table.refs().get(tagName)).isNull(); + + assertThatThrownBy( + () -> { + ManageSnapshots manageSnapshots = + table.manageSnapshots().createTag(tagName, snapshotId); + manageSnapshots.validate(ImmutableList.of(alwaysFailValidation)); + manageSnapshots.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.refs().get(tagName)).isNull(); + } + + @TestTemplate + public void testManageSnapshotsFailsDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "tag1"; + assertThat(table.refs().get(tagName)).isNull(); + + setWatermarkProperty(table, 0); + + ManageSnapshots pendingUpdate = table.manageSnapshots().createTag(tagName, snapshotId); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Table metadata refresh is required"); + + assertThat(table.refs().get(tagName)).isNull(); + } + + @TestTemplate + public void testManageSnapshotsFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "tag1"; + assertThat(table.refs().get(tagName)).isNull(); + + assertThatThrownBy( + () -> { + ManageSnapshots manageSnapshots = + table.manageSnapshots().createTag(tagName, snapshotId); + manageSnapshots.validate(ImmutableList.of(illegalValidation)); + manageSnapshots.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.refs().get(tagName)).isNull(); + } + + @TestTemplate + public void testMergeAppendPassesValidation() { + validateTableFiles(table); + + AppendFiles appendFiles = table.newAppend().appendFile(FILE_A); + appendFiles.validate(ImmutableList.of(alwaysPassValidation)); + appendFiles.commit(); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testMergeAppendFailsValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> { + AppendFiles appendFiles = table.newAppend().appendFile(FILE_A); + appendFiles.validate(ImmutableList.of(alwaysFailValidation)); + appendFiles.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table); + } + + @TestTemplate + public void testMergeAppendFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + setWatermarkProperty(table, 0); + + AppendFiles pendingUpdate = table.newAppend().appendFile(FILE_A); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table); + } + + @TestTemplate + public void testMergeAppendFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> { + AppendFiles appendFiles = table.newAppend().appendFile(FILE_A); + appendFiles.validate(ImmutableList.of(illegalValidation)); + appendFiles.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } + + @TestTemplate + public void testOverwritePassesValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + OverwriteFiles overwriteFiles = + table.newOverwrite().overwriteByRowFilter(Expressions.alwaysTrue()).addFile(FILE_B); + overwriteFiles.validate(ImmutableList.of(alwaysPassValidation)); + overwriteFiles.commit(); + + validateTableFiles(table, FILE_B); + } + + @TestTemplate + public void testOverwriteFailsValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> { + OverwriteFiles overwriteFiles = + table + .newOverwrite() + .overwriteByRowFilter(Expressions.alwaysTrue()) + .addFile(FILE_B); + overwriteFiles.validate(ImmutableList.of(alwaysFailValidation)); + overwriteFiles.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testOverwriteFailsValidationDueToConcurrentCommit() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + setWatermarkProperty(table, 0); + + OverwriteFiles pendingUpdate = + table.newOverwrite().overwriteByRowFilter(Expressions.alwaysTrue()).addFile(FILE_B); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testOverwriteFailsDueToIllegalTableModificationInsideValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> { + OverwriteFiles overwriteFiles = + table + .newOverwrite() + .overwriteByRowFilter(Expressions.alwaysTrue()) + .addFile(FILE_B); + overwriteFiles.validate(ImmutableList.of(illegalValidation)); + overwriteFiles.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testReplacePartitionsPassesValidation() { + validateTableFiles(table); + + ReplacePartitions replacePartitions = table.newReplacePartitions().addFile(FILE_A); + replacePartitions.validate(ImmutableList.of(alwaysPassValidation)); + replacePartitions.commit(); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testReplacePartitionsFailsValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> { + ReplacePartitions replacePartitions = table.newReplacePartitions().addFile(FILE_A); + replacePartitions.validate(ImmutableList.of(alwaysFailValidation)); + replacePartitions.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table); + } + + @TestTemplate + public void testReplacePartitionsFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + setWatermarkProperty(table, 0); + + ReplacePartitions pendingUpdate = table.newReplacePartitions().addFile(FILE_A).addFile(FILE_B); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table); + } + + @TestTemplate + public void testReplacePartitionsFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> { + ReplacePartitions replacePartitions = table.newReplacePartitions().addFile(FILE_A); + replacePartitions.validate(ImmutableList.of(illegalValidation)); + replacePartitions.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } + + @TestTemplate + public void testReplaceSortOrderPassesValidation() { + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + + ReplaceSortOrder replaceSortOrder = table.replaceSortOrder().asc("data"); + replaceSortOrder.validate(ImmutableList.of(alwaysPassValidation)); + replaceSortOrder.commit(); + + assertThat(table.sortOrder()) + .as("Table should reflect new sort order") + .isEqualTo(SortOrder.builderFor(table.schema()).asc("data").build()); + } + + @TestTemplate + public void testReplaceSortOrderFailsValidation() { + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + + assertThatThrownBy( + () -> { + ReplaceSortOrder replaceSortOrder = table.replaceSortOrder().asc("data"); + replaceSortOrder.validate(ImmutableList.of(alwaysFailValidation)); + replaceSortOrder.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + } + + @TestTemplate + public void testReplaceSortOrderFailsValidationDueToConcurrentCommit() { + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + + setWatermarkProperty(table, 0); + + ReplaceSortOrder pendingUpdate = table.replaceSortOrder().asc("data"); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + } + + @TestTemplate + public void testReplaceSortOrderFailsDueToIllegalTableModificationInsideValidation() { + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + + assertThatThrownBy( + () -> { + ReplaceSortOrder replaceSortOrder = table.replaceSortOrder().asc("data"); + replaceSortOrder.validate(ImmutableList.of(illegalValidation)); + replaceSortOrder.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + } + + @TestTemplate + public void testRewriteFilesPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + RewriteFiles rewriteFiles = + table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)); + rewriteFiles.validate(ImmutableList.of(alwaysPassValidation)); + rewriteFiles.commit(); + + validateTableFiles(table, FILE_B); + } + + @TestTemplate + public void testRewriteFilesFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> { + RewriteFiles rewriteFiles = + table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)); + rewriteFiles.validate(ImmutableList.of(alwaysFailValidation)); + rewriteFiles.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testRewriteFilesFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + setWatermarkProperty(table, 0); + + RewriteFiles pendingUpdate = + table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testRewriteFilesFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> { + RewriteFiles rewriteFiles = + table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)); + rewriteFiles.validate(ImmutableList.of(illegalValidation)); + rewriteFiles.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testRewriteManifestsPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + + RewriteManifests rewriteManifests = table.rewriteManifests().clusterBy(dataFile -> ""); + rewriteManifests.validate(ImmutableList.of(alwaysPassValidation)); + rewriteManifests.commit(); + + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1); + } + + @TestTemplate + public void testRewriteManifestsFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + + assertThatThrownBy( + () -> { + RewriteManifests rewriteManifests = + table.rewriteManifests().clusterBy(dataFile -> ""); + rewriteManifests.validate(ImmutableList.of(alwaysFailValidation)); + rewriteManifests.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + } + + @TestTemplate + public void testRewriteManifestsFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + + setWatermarkProperty(table, 0); + + RewriteManifests pendingUpdate = table.rewriteManifests().clusterBy(dataFile -> ""); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + } + + @TestTemplate + public void testRewriteManifestsFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + + assertThatThrownBy( + () -> { + RewriteManifests rewriteManifests = + table.rewriteManifests().clusterBy(dataFile -> ""); + rewriteManifests.validate(ImmutableList.of(illegalValidation)); + rewriteManifests.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + } + + @TestTemplate + public void testRowDeltaPassesValidation() { + validateTableFiles(table); + + RowDelta rowDelta = table.newRowDelta().addRows(FILE_A); + rowDelta.validate(ImmutableList.of(alwaysPassValidation)); + rowDelta.commit(); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testRowDeltaFailsValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> { + RowDelta rowDelta = table.newRowDelta().addRows(FILE_A); + rowDelta.validate(ImmutableList.of(alwaysFailValidation)); + rowDelta.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table); + } + + @TestTemplate + public void testRowDeltaFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + setWatermarkProperty(table, 0); + + RowDelta pendingUpdate = table.newRowDelta().addRows(FILE_A); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table); + } + + @TestTemplate + public void testRowDeltaFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> { + RowDelta rowDelta = table.newRowDelta().addRows(FILE_A); + rowDelta.validate(ImmutableList.of(illegalValidation)); + rowDelta.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } + + @TestTemplate + public void testSetSnapshotOperationPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + validateTableFiles(table, FILE_A, FILE_B); + + SetSnapshotOperation setSnapshotOperation = + new SetSnapshotOperation(table.operations()).setCurrentSnapshot(firstSnapshotId); + setSnapshotOperation.validate(ImmutableList.of(alwaysPassValidation)); + setSnapshotOperation.commit(); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(firstSnapshotId); + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testSetSnapshotOperationFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + long secondSnapshotId = table.currentSnapshot().snapshotId(); + validateTableFiles(table, FILE_A, FILE_B); + + assertThatThrownBy( + () -> { + SetSnapshotOperation setSnapshotOperation = + new SetSnapshotOperation(table.operations()).setCurrentSnapshot(firstSnapshotId); + setSnapshotOperation.validate(ImmutableList.of(alwaysFailValidation)); + setSnapshotOperation.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(secondSnapshotId); + validateTableFiles(table, FILE_A, FILE_B); + } + + @TestTemplate + public void testSetSnapshotOperationFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + long secondSnapshotId = table.currentSnapshot().snapshotId(); + validateTableFiles(table, FILE_A, FILE_B); + + setWatermarkProperty(table, 0); + + SetSnapshotOperation pendingUpdate = + new SetSnapshotOperation(table.operations()).setCurrentSnapshot(firstSnapshotId); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(secondSnapshotId); + validateTableFiles(table, FILE_A, FILE_B); + } + + @TestTemplate + public void testSetSnapshotOperationFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + long secondSnapshotId = table.currentSnapshot().snapshotId(); + validateTableFiles(table, FILE_A, FILE_B); + + assertThatThrownBy( + () -> { + SetSnapshotOperation setSnapshotOperation = + new SetSnapshotOperation(table.operations()).setCurrentSnapshot(firstSnapshotId); + setSnapshotOperation.validate(ImmutableList.of(illegalValidation)); + setSnapshotOperation.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(secondSnapshotId); + validateTableFiles(table, FILE_A, FILE_B); + } + + @TestTemplate + public void testUpdateSnapshotReferencesOperationPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + String branchName = "feature-develop"; + assertThat(table.ops().refresh().ref(branchName)).isNull(); + + UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation = + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId); + updateSnapshotReferencesOperation.validate(ImmutableList.of(alwaysPassValidation)); + updateSnapshotReferencesOperation.commit(); + + assertThat(table.ops().refresh().ref(branchName)) + .isEqualTo(SnapshotRef.branchBuilder(firstSnapshotId).build()); + } + + @TestTemplate + public void testUpdateSnapshotReferencesOperationFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + String branchName = "feature-develop"; + assertThat(table.ops().refresh().ref(branchName)).isNull(); + + assertThatThrownBy( + () -> { + UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation = + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId); + updateSnapshotReferencesOperation.validate(ImmutableList.of(alwaysFailValidation)); + updateSnapshotReferencesOperation.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.ops().refresh().ref(branchName)).isNull(); + } + + @TestTemplate + public void testUpdateSnapshotReferencesOperationFailsDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + String branchName = "feature-develop"; + assertThat(table.ops().refresh().ref(branchName)).isNull(); + + setWatermarkProperty(table, 0); + + UpdateSnapshotReferencesOperation pendingUpdate = + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Cannot commit changes based on stale metadata"); + + assertThat(table.ops().refresh().ref(branchName)).isNull(); + } + + @TestTemplate + public void + testUpdateSnapshotReferencesOperationFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + String branchName = "feature-develop"; + assertThat(table.ops().refresh().ref(branchName)).isNull(); + + assertThatThrownBy( + () -> { + UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation = + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId); + updateSnapshotReferencesOperation.validate(ImmutableList.of(illegalValidation)); + updateSnapshotReferencesOperation.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.ops().refresh().ref(branchName)).isNull(); + } + + private static GenericStatisticsFile genericStatisticsFile(Snapshot currentSnapshot) { + return new GenericStatisticsFile( + currentSnapshot.snapshotId(), + "/some/statistics/file.puffin", + 100, + 42, + ImmutableList.of( + new GenericBlobMetadata( + "stats-type", + currentSnapshot.snapshotId(), + currentSnapshot.sequenceNumber(), + ImmutableList.of(1, 2), + ImmutableMap.of("a-property", "some-property-value")))); + } + + @TestTemplate + public void testUpdateStatisticsPassesValidation() { + table.newFastAppend().commit(); + Snapshot currentSnapshot = table.currentSnapshot(); + assertThat(table.statisticsFiles()).isEmpty(); + + GenericStatisticsFile statisticsFile = genericStatisticsFile(currentSnapshot); + UpdateStatistics updateStatistics = + table.updateStatistics().setStatistics(currentSnapshot.snapshotId(), statisticsFile); + updateStatistics.validate(ImmutableList.of(alwaysPassValidation)); + updateStatistics.commit(); + + assertThat(table.statisticsFiles()) + .as("Table should have statistics files") + .containsExactly(statisticsFile); + } + + @TestTemplate + public void testUpdateStatisticsFailsValidation() { + table.newFastAppend().commit(); + Snapshot currentSnapshot = table.currentSnapshot(); + assertThat(table.statisticsFiles()).isEmpty(); + + assertThatThrownBy( + () -> { + UpdateStatistics updateStatistics = + table + .updateStatistics() + .setStatistics( + currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)); + updateStatistics.validate(ImmutableList.of(alwaysFailValidation)); + updateStatistics.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.statisticsFiles()).isEmpty(); + } + + @TestTemplate + public void testUpdateStatisticsFailsValidationDueToConcurrentCommit() { + table.newFastAppend().commit(); + Snapshot currentSnapshot = table.currentSnapshot(); + assertThat(table.statisticsFiles()).isEmpty(); + + setWatermarkProperty(table, 0); + + UpdateStatistics pendingUpdate = + table + .updateStatistics() + .setStatistics(currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.statisticsFiles()).isEmpty(); + } + + @TestTemplate + public void testUpdateStatisticsFailsDueToIllegalTableModificationInsideValidation() { + table.newFastAppend().commit(); + Snapshot currentSnapshot = table.currentSnapshot(); + assertThat(table.statisticsFiles()).isEmpty(); + + assertThatThrownBy( + () -> { + UpdateStatistics updateStatistics = + table + .updateStatistics() + .setStatistics( + currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)); + updateStatistics.validate(ImmutableList.of(illegalValidation)); + updateStatistics.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.statisticsFiles()).isEmpty(); + } + + @TestTemplate + public void testUpdateLocationPassesValidation(@TempDir File tempDir) { + String newLocation = tempDir.getAbsolutePath(); + assertThat(table.location()).isNotEqualTo(newLocation); + + UpdateLocation updateLocation = table.updateLocation().setLocation(newLocation); + updateLocation.validate(ImmutableList.of(alwaysPassValidation)); + updateLocation.commit(); + + assertThat(table.location()).isEqualTo(newLocation); + } + + @TestTemplate + public void testUpdateLocationFailsValidation(@TempDir File tempDir) { + String originalLocation = table.location(); + String newLocation = tempDir.getAbsolutePath(); + assertThat(originalLocation).isNotEqualTo(newLocation); + + assertThatThrownBy( + () -> { + UpdateLocation updateLocation = table.updateLocation().setLocation(newLocation); + updateLocation.validate(ImmutableList.of(alwaysFailValidation)); + updateLocation.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.location()).isEqualTo(originalLocation); + } + + @TestTemplate + public void testUpdateLocationFailsValidationDueToConcurrentCommit(@TempDir File tempDir) { + String originalLocation = table.location(); + String newLocation = tempDir.getAbsolutePath(); + assertThat(originalLocation).isNotEqualTo(newLocation); + + setWatermarkProperty(table, 0); + + UpdateLocation pendingUpdate = table.updateLocation().setLocation(newLocation); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.location()).isEqualTo(originalLocation); + } + + @TestTemplate + public void testUpdateLocationFailsDueToIllegalTableModificationInsideValidation( + @TempDir File tempDir) { + String originalLocation = table.location(); + String newLocation = tempDir.getAbsolutePath(); + assertThat(originalLocation).isNotEqualTo(newLocation); + + assertThatThrownBy( + () -> { + UpdateLocation updateLocation = table.updateLocation().setLocation(newLocation); + updateLocation.validate(ImmutableList.of(illegalValidation)); + updateLocation.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.location()).isEqualTo(originalLocation); + } + + @TestTemplate + public void testUpdatePropertiesPassesValidation() { + String key = "newKey"; + String value = "newValue"; + assertThat(table.properties().get(key)).isNull(); + + UpdateProperties updateProperties = table.updateProperties().set(key, value); + updateProperties.validate(ImmutableList.of(alwaysPassValidation)); + updateProperties.commit(); + + assertThat(table.properties().get(key)).isEqualTo(value); + } + + @TestTemplate + public void testUpdatePropertiesFailsValidation() { + String key = "newKey"; + String value = "newValue"; + assertThat(table.properties().get(key)).isNull(); + + assertThatThrownBy( + () -> { + UpdateProperties updateProperties = table.updateProperties().set(key, value); + updateProperties.validate(ImmutableList.of(alwaysFailValidation)); + updateProperties.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.properties().get(key)).isNull(); + } + + @TestTemplate + public void testUpdatePropertiesFailsValidationDueToConcurrentCommit() { + String key = "newKey"; + String value = "newValue"; + assertThat(table.properties().get(key)).isNull(); + + setWatermarkProperty(table, 0); + + UpdateProperties pendingUpdate = table.updateProperties().set(key, value); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.properties().get(key)).isNull(); + } + + @TestTemplate + public void testUpdatePropertiesFailsDueToIllegalTableModificationInsideValidation() { + String key = "newKey"; + String value = "newValue"; + assertThat(table.properties().get(key)).isNull(); + + assertThatThrownBy( + () -> { + UpdateProperties updateProperties = table.updateProperties().set(key, value); + updateProperties.validate(ImmutableList.of(illegalValidation)); + updateProperties.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.properties().get(key)).isNull(); + } + + private static final Schema ORIGINAL_SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get())); + + @TestTemplate + public void testUpdateSchemaPassesValidation() { + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + + UpdateSchema updateSchema = table.updateSchema().addColumn("bool", Types.BooleanType.get()); + updateSchema.validate(ImmutableList.of(alwaysPassValidation)); + updateSchema.commit(); + + assertThat( + table + .schema() + .sameSchema( + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + optional(3, "bool", Types.BooleanType.get())))) + .as("Should include new bucket") + .isTrue(); + } + + @TestTemplate + public void testUpdateSchemaFailsValidation() { + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + + assertThatThrownBy( + () -> { + UpdateSchema updateSchema = + table.updateSchema().addColumn("bool", Types.BooleanType.get()); + updateSchema.validate(ImmutableList.of(alwaysFailValidation)); + updateSchema.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + } + + @TestTemplate + public void testUpdateSchemaFailsDueToConcurrentCommit() { + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + + setWatermarkProperty(table, 0); + + UpdateSchema pendingUpdate = table.updateSchema().addColumn("bool", Types.BooleanType.get()); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Cannot commit changes based on stale metadata"); + + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + } + + @TestTemplate + public void testUpdateSchemaFailsDueToIllegalTableModificationInsideValidation() { + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + + assertThatThrownBy( + () -> { + UpdateSchema updateSchema = + table.updateSchema().addColumn("bool", Types.BooleanType.get()); + updateSchema.validate(ImmutableList.of(illegalValidation)); + updateSchema.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + } + + private static final PartitionSpec ORIGINAL_SPEC = + PartitionSpec.builderFor(ORIGINAL_SCHEMA) + .bucket("data", BUCKETS_NUMBER, "data_bucket") + .withSpecId(0) + .build(); + + @TestTemplate + public void testUpdateSpecPassesValidation() { + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + + UpdatePartitionSpec updatePartitionSpec = + table.updateSpec().addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)); + updatePartitionSpec.validate(ImmutableList.of(alwaysPassValidation)); + updatePartitionSpec.commit(); + + assertThat(table.spec()) + .as("Should include new bucket") + .isEqualTo( + PartitionSpec.builderFor(table.schema()) + .bucket("data", BUCKETS_NUMBER, "data_bucket") + .bucket("id", BUCKETS_NUMBER, "id_bucket") + .withSpecId(1) + .build()); + } + + @TestTemplate + public void testUpdateSpecFailsValidation() { + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + + assertThatThrownBy( + () -> { + UpdatePartitionSpec updatePartitionSpec = + table + .updateSpec() + .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)); + updatePartitionSpec.validate(ImmutableList.of(alwaysFailValidation)); + updatePartitionSpec.commit(); + }) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + } + + @TestTemplate + public void testUpdateSpecFailsDueToConcurrentCommit() { + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + + setWatermarkProperty(table, 0); + + UpdatePartitionSpec pendingUpdate = + table.updateSpec().addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)); + pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(pendingUpdate::commit) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Cannot commit changes based on stale metadata"); + + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + } + + @TestTemplate + public void testUpdateSpecFailsDueToIllegalTableModificationInsideValidation() { + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + + assertThatThrownBy( + () -> { + UpdatePartitionSpec updatePartitionSpec = + table + .updateSpec() + .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)); + updatePartitionSpec.validate(ImmutableList.of(illegalValidation)); + updatePartitionSpec.commit(); + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index 8fed7134fae1..2d73912dc8d5 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -26,12 +26,15 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; @@ -714,4 +717,98 @@ public void testTransactionRecommit() { assertThat(paths).isEqualTo(expectedPaths); assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); } + + @TestTemplate + public void testTransactionPassesValidation() { + validateTableFiles(table); + + Transaction transaction = table.newTransaction(); + AppendFiles appendFiles = transaction.newAppend().appendFile(FILE_A); + appendFiles.validate( + ImmutableList.of(new Validation(currentTable -> true, "Custom validation failed."))); + appendFiles.commit(); + transaction.commitTransaction(); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testTransactionFailsValidation() { + validateTableFiles(table); + + Transaction transaction = table.newTransaction(); + assertThatThrownBy( + () -> { + AppendFiles appendFiles = transaction.newAppend().appendFile(FILE_A); + appendFiles.validate( + ImmutableList.of( + new Validation(currentTable -> false, "Custom validation failed."))); + appendFiles.commit(); + }, + "Transaction commit should fail") + .isInstanceOf(ValidationException.class) + .hasMessage("Custom validation failed."); + + validateTableFiles(table); + } + + @TestTemplate + public void testTransactionFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + String watermarkKey = "custom_watermark"; + String currentWatermarkValue = "1"; + String nextWatermarkValue = "2"; + + table.updateProperties().set(watermarkKey, currentWatermarkValue).commit(); + + Transaction transaction = table.newTransaction(); + transaction.newAppend().appendFile(FILE_A).commit(); + UpdateProperties updateProperties = + transaction.updateProperties().set(watermarkKey, nextWatermarkValue); + updateProperties.validate( + ImmutableList.of( + new Validation( + currentTable -> + Objects.equals( + currentTable.properties().get(watermarkKey), currentWatermarkValue), + "Current watermark value not equal to expected value=%s", + currentWatermarkValue))); + updateProperties.commit(); + + // concurrent update to the table which advances our watermark value before we're able to commit + table.updateProperties().set(watermarkKey, nextWatermarkValue).commit(); + + assertThatThrownBy(transaction::commitTransaction, "Transaction commit should fail") + .isInstanceOf(ValidationException.class) + .hasMessage("Current watermark value not equal to expected value=1"); + + validateTableFiles(table); + assertThat(table.properties().get(watermarkKey)).isEqualTo(nextWatermarkValue); + } + + @TestTemplate + public void testTransactionFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> { + AppendFiles appendFiles = table.newTransaction().newAppend().appendFile(FILE_A); + appendFiles.validate( + ImmutableList.of( + new Validation( + currentTable -> { + // illegal action + currentTable.updateProperties().set("key", "value").commit(); + return true; + }, + "Custom validation failed."))); + appendFiles.commit(); + }, + "Any attempts to modify a table inside a validation should throw an exception") + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } } From 4baa137f1c6ac6712c4315a573e520d4ef52be7f Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Mon, 26 Dec 2022 14:50:34 -0500 Subject: [PATCH 2/2] Add commitIf method to PendingUpdate --- .../org/apache/iceberg/PendingUpdate.java | 29 +- .../java/org/apache/iceberg/Validation.java | 30 +- ...ationUtils.java => BasePendingUpdate.java} | 15 +- .../apache/iceberg/BaseReplaceSortOrder.java | 13 +- .../iceberg/BaseUpdatePartitionSpec.java | 13 +- .../org/apache/iceberg/PropertiesUpdate.java | 13 +- .../org/apache/iceberg/RemoveSnapshots.java | 11 +- .../java/org/apache/iceberg/SchemaUpdate.java | 11 +- .../java/org/apache/iceberg/SetLocation.java | 12 +- .../apache/iceberg/SetSnapshotOperation.java | 12 +- .../org/apache/iceberg/SetStatistics.java | 12 +- .../org/apache/iceberg/SnapshotManager.java | 18 +- .../org/apache/iceberg/SnapshotProducer.java | 14 +- .../UpdateSnapshotReferencesOperation.java | 14 +- .../apache/iceberg/TestCustomValidations.java | 595 ++++++++---------- .../org/apache/iceberg/TestTransaction.java | 72 +-- 16 files changed, 362 insertions(+), 522 deletions(-) rename core/src/main/java/org/apache/iceberg/{ValidationUtils.java => BasePendingUpdate.java} (66%) diff --git a/api/src/main/java/org/apache/iceberg/PendingUpdate.java b/api/src/main/java/org/apache/iceberg/PendingUpdate.java index 900d095b1c1d..6de3432f6157 100644 --- a/api/src/main/java/org/apache/iceberg/PendingUpdate.java +++ b/api/src/main/java/org/apache/iceberg/PendingUpdate.java @@ -18,10 +18,10 @@ */ package org.apache.iceberg; +import java.util.List; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; -import java.util.List; /** * API for table metadata changes. @@ -42,31 +42,38 @@ public interface PendingUpdate { T apply(); /** - * Validate the current version of the table. + * Apply the pending changes and commit. + * + *

Changes are committed by calling the underlying table's commit method. + * + *

Once the commit is successful, the updated table will be refreshed. * - * @param validations A list of {@link Validation} which will be used to test whether it is safe - * to commit the pending changes to the current version of the table at commit time. * @throws ValidationException If the update cannot be applied to the current table metadata. - * @throws UnsupportedOperationException If any of the supplied validations attempt to modify the - * table. + * @throws CommitFailedException If the update cannot be committed due to conflicts. + * @throws CommitStateUnknownException If the update success or failure is unknown, no cleanup + * should be done in this case. */ - default void validate(List validations) { - throw new UnsupportedOperationException(); - } + void commit(); /** - * Apply the pending changes and commit. + * Apply the pending changes, validate the current version of the table, and commit. * *

Changes are committed by calling the underlying table's commit method. * *

Once the commit is successful, the updated table will be refreshed. * + * @param validations A list of {@link Validation} which will be used to test whether it is safe + * to commit the pending changes to the current version of the table at commit time. * @throws ValidationException If the update cannot be applied to the current table metadata. + * @throws UnsupportedOperationException If any of the supplied validations attempt to modify the + * table it is given. * @throws CommitFailedException If the update cannot be committed due to conflicts. * @throws CommitStateUnknownException If the update success or failure is unknown, no cleanup * should be done in this case. */ - void commit(); + default void commitIf(List validations) { + throw new UnsupportedOperationException(); + } /** * Generates update event to notify about metadata changes diff --git a/api/src/main/java/org/apache/iceberg/Validation.java b/api/src/main/java/org/apache/iceberg/Validation.java index 247cc98f7091..85c4448f15de 100644 --- a/api/src/main/java/org/apache/iceberg/Validation.java +++ b/api/src/main/java/org/apache/iceberg/Validation.java @@ -1,22 +1,20 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iceberg; diff --git a/core/src/main/java/org/apache/iceberg/ValidationUtils.java b/core/src/main/java/org/apache/iceberg/BasePendingUpdate.java similarity index 66% rename from core/src/main/java/org/apache/iceberg/ValidationUtils.java rename to core/src/main/java/org/apache/iceberg/BasePendingUpdate.java index 86a9e6da3bcb..bfab91d32fa2 100644 --- a/core/src/main/java/org/apache/iceberg/ValidationUtils.java +++ b/core/src/main/java/org/apache/iceberg/BasePendingUpdate.java @@ -19,12 +19,19 @@ package org.apache.iceberg; import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; -class ValidationUtils { - private ValidationUtils() {} +abstract class BasePendingUpdate implements PendingUpdate { + private final List pendingValidations = Lists.newArrayList(); - static void validate(TableMetadata base, List validations) { + @Override + public void commitIf(List validations) { + this.pendingValidations.addAll(validations); + commit(); + } + + protected final void validate(TableMetadata base) { Table currentTable = new BaseTable(new StaticTableOperations(base), null); - validations.forEach(validation -> validation.validate(currentTable)); + this.pendingValidations.forEach(validation -> validation.validate(currentTable)); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java index 0934d24323e2..0bdeedee3fe9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java @@ -27,17 +27,14 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; -import java.util.List; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.expressions.Term; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.Tasks; -public class BaseReplaceSortOrder implements ReplaceSortOrder { +public class BaseReplaceSortOrder extends BasePendingUpdate implements ReplaceSortOrder { private final TableOperations ops; private final SortOrder.Builder builder; private TableMetadata base; - private final List pendingValidations = Lists.newArrayList(); BaseReplaceSortOrder(TableOperations ops) { this.ops = ops; @@ -50,12 +47,6 @@ public SortOrder apply() { return builder.build(); } - @Override - public void validate(List validations) { - ValidationUtils.validate(base, validations); - pendingValidations.addAll(validations); - } - @Override public void commit() { Tasks.foreach(ops) @@ -71,7 +62,7 @@ public void commit() { this.base = ops.refresh(); SortOrder newOrder = apply(); TableMetadata updated = base.replaceSortOrder(newOrder); - ValidationUtils.validate(base, pendingValidations); + validate(base); taskOps.commit(base, updated); }); } diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index f9d784dc1902..21781f86a01b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -41,7 +41,8 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.util.Pair; -class BaseUpdatePartitionSpec implements UpdatePartitionSpec { +class BaseUpdatePartitionSpec extends BasePendingUpdate + implements UpdatePartitionSpec { private final TableOperations ops; private final TableMetadata base; private final int formatVersion; @@ -62,8 +63,6 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec { private boolean setAsDefault; private int lastAssignedPartitionId; - private final List pendingValidations = Lists.newArrayList(); - BaseUpdatePartitionSpec(TableOperations ops) { this.ops = ops; this.caseSensitive = true; @@ -336,12 +335,6 @@ public PartitionSpec apply() { return builder.build(); } - @Override - public void validate(List validations) { - ValidationUtils.validate(base, validations); - pendingValidations.addAll(validations); - } - @Override public void commit() { TableMetadata update; @@ -350,7 +343,7 @@ public void commit() { } else { update = base.addPartitionSpec(apply()); } - ValidationUtils.validate(base, pendingValidations); + validate(base); ops.commit(base, update); } diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 0ec3aeb808e7..20868d290c68 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -27,22 +27,19 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Tasks; -class PropertiesUpdate implements UpdateProperties { +class PropertiesUpdate extends BasePendingUpdate> implements UpdateProperties { private final TableOperations ops; private final Map updates = Maps.newHashMap(); private final Set removals = Sets.newHashSet(); private TableMetadata base; - private final List pendingValidations = Lists.newArrayList(); PropertiesUpdate(TableOperations ops) { this.ops = ops; @@ -99,12 +96,6 @@ public Map apply() { return newProperties; } - @Override - public void validate(List validations) { - ValidationUtils.validate(base, validations); - pendingValidations.addAll(validations); - } - @Override public void commit() { Tasks.foreach(ops) @@ -119,7 +110,7 @@ public void commit() { taskOps -> { Map newProperties = apply(); TableMetadata updated = base.replaceProperties(newProperties); - ValidationUtils.validate(base, pendingValidations); + validate(base); taskOps.commit(base, updated); }); } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 4a11829d20a2..0cdd20c8e39a 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("UnnecessaryAnonymousClass") -class RemoveSnapshots implements ExpireSnapshots { +class RemoveSnapshots extends BasePendingUpdate> implements ExpireSnapshots { private static final Logger LOG = LoggerFactory.getLogger(RemoveSnapshots.class); // Creates an executor service that runs each task in the thread that invokes execute/submit. @@ -85,7 +85,6 @@ public void accept(String file) { private ExecutorService planExecutorService = ThreadPools.getWorkerPool(); private Boolean incrementalCleanup; private boolean specifiedSnapshotId = false; - private final List pendingValidations = Lists.newArrayList(); RemoveSnapshots(TableOperations ops) { this.ops = ops; @@ -294,12 +293,6 @@ private Set unreferencedSnapshotsToRetain(Collection refs) { return snapshotsToRetain; } - @Override - public void validate(List validations) { - ValidationUtils.validate(base, validations); - pendingValidations.addAll(validations); - } - @Override public void commit() { Tasks.foreach(ops) @@ -313,7 +306,7 @@ public void commit() { .run( item -> { TableMetadata updated = internalApply(); - ValidationUtils.validate(base, pendingValidations); + validate(base); ops.commit(base, updated); }); LOG.info("Committed snapshot changes"); diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index 78b723286988..23662b281eed 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; /** Schema evolution API implementation. */ -class SchemaUpdate implements UpdateSchema { +class SchemaUpdate extends BasePendingUpdate implements UpdateSchema { private static final Logger LOG = LoggerFactory.getLogger(SchemaUpdate.class); private static final int TABLE_ROOT_ID = -1; @@ -64,7 +64,6 @@ class SchemaUpdate implements UpdateSchema { private boolean allowIncompatibleChanges = false; private Set identifierFieldNames; private boolean caseSensitive = true; - private final List pendingValidations = Lists.newArrayList(); SchemaUpdate(TableOperations ops) { this(ops, ops.current()); @@ -443,16 +442,10 @@ public Schema apply() { return newSchema; } - @Override - public void validate(List validations) { - ValidationUtils.validate(base, validations); - pendingValidations.addAll(validations); - } - @Override public void commit() { TableMetadata update = applyChangesToMetadata(base.updateSchema(apply(), lastColumnId)); - ValidationUtils.validate(base, pendingValidations); + validate(base); ops.commit(base, update); } diff --git a/core/src/main/java/org/apache/iceberg/SetLocation.java b/core/src/main/java/org/apache/iceberg/SetLocation.java index 78a208eca127..7efcf2059339 100644 --- a/core/src/main/java/org/apache/iceberg/SetLocation.java +++ b/core/src/main/java/org/apache/iceberg/SetLocation.java @@ -27,15 +27,12 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; -import java.util.List; import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.Tasks; -public class SetLocation implements UpdateLocation { +public class SetLocation extends BasePendingUpdate implements UpdateLocation { private final TableOperations ops; private String newLocation; - private final List pendingValidations = Lists.newArrayList(); public SetLocation(TableOperations ops) { this.ops = ops; @@ -53,11 +50,6 @@ public String apply() { return newLocation; } - @Override - public void validate(List validations) { - pendingValidations.addAll(validations); - } - @Override public void commit() { TableMetadata base = ops.refresh(); @@ -71,7 +63,7 @@ public void commit() { .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { - ValidationUtils.validate(base, pendingValidations); + validate(base); taskOps.commit(base, base.updateLocation(newLocation)); }); } diff --git a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java index 3772f9f7de85..367a3f498278 100644 --- a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java +++ b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java @@ -31,7 +31,6 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; @@ -41,13 +40,12 @@ *

This update is not exposed though the Table API. Instead, it is a package-private part of the * Transaction API intended for use in {@link ManageSnapshots}. */ -class SetSnapshotOperation implements PendingUpdate { +class SetSnapshotOperation extends BasePendingUpdate implements PendingUpdate { private final TableOperations ops; private TableMetadata base; private Long targetSnapshotId = null; private boolean isRollback = false; - private final List pendingValidations = Lists.newArrayList(); SetSnapshotOperation(TableOperations ops) { this.ops = ops; @@ -107,12 +105,6 @@ public Snapshot apply() { return base.snapshot(targetSnapshotId); } - @Override - public void validate(List validations) { - ValidationUtils.validate(base, validations); - pendingValidations.addAll(validations); - } - @Override public void commit() { Tasks.foreach(ops) @@ -131,7 +123,7 @@ public void commit() { .setBranchSnapshot(snapshot.snapshotId(), SnapshotRef.MAIN_BRANCH) .build(); - ValidationUtils.validate(base, pendingValidations); + validate(base); // Do commit this operation even if the metadata has not changed, as we need to // advance the hasLastOpCommited for the transaction's commit to work properly. diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index 349bf2d5c539..333617147a59 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -22,13 +22,12 @@ import java.util.Map; import java.util.Optional; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -public class SetStatistics implements UpdateStatistics { +public class SetStatistics extends BasePendingUpdate> + implements UpdateStatistics { private final TableOperations ops; private final Map> statisticsToSet = Maps.newHashMap(); - private final List pendingValidations = Lists.newArrayList(); public SetStatistics(TableOperations ops) { this.ops = ops; @@ -52,16 +51,11 @@ public List apply() { return internalApply(ops.current()).statisticsFiles(); } - @Override - public void validate(List validations) { - pendingValidations.addAll(validations); - } - @Override public void commit() { TableMetadata base = ops.current(); TableMetadata newMetadata = internalApply(base); - ValidationUtils.validate(base, pendingValidations); + validate(base); ops.commit(base, newMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java index c27ee9957573..54dc4349dcb6 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java @@ -175,16 +175,6 @@ public Snapshot apply() { return transaction.table().currentSnapshot(); } - @Override - public void validate(List validations) { - commitIfRefUpdatesExist(); - - // Add a no-op UpdateProperties to add given validations to transaction - UpdateProperties updateProperties = transaction.updateProperties(); - updateProperties.validate(validations); - updateProperties.commit(); - } - @Override public void commit() { commitIfRefUpdatesExist(); @@ -192,4 +182,12 @@ public void commit() { transaction.commitTransaction(); } } + + @Override + public void commitIf(List validations) { + commitIfRefUpdatesExist(); + // Add a no-op UpdateProperties to add given validations to transaction + transaction.updateProperties().commitIf(validations); + commit(); + } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index fee43a183fea..7dbacfa352d1 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -79,7 +79,8 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("UnnecessaryAnonymousClass") -abstract class SnapshotProducer implements SnapshotUpdate { +abstract class SnapshotProducer extends BasePendingUpdate + implements SnapshotUpdate { private static final Logger LOG = LoggerFactory.getLogger(SnapshotProducer.class); static final int MIN_FILE_GROUP_SIZE = 10_000; static final Set EMPTY_SET = Sets.newHashSet(); @@ -114,8 +115,6 @@ public void accept(String file) { private String targetBranch = SnapshotRef.MAIN_BRANCH; private CommitMetrics commitMetrics; - private final List pendingValidations = Lists.newArrayList(); - protected SnapshotProducer(TableOperations ops) { this.ops = ops; this.strictCleanup = ops.requireStrictCleanup(); @@ -376,12 +375,6 @@ protected TableMetadata refresh() { return base; } - @Override - public void validate(List validations) { - ValidationUtils.validate(base, validations); - pendingValidations.addAll(validations); - } - @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") public void commit() { @@ -413,6 +406,7 @@ public void commit() { } TableMetadata updated = update.build(); + validate(base); if (updated.changes().isEmpty()) { // do not commit if the metadata has not changed. for example, this may happen // when setting the current @@ -421,8 +415,6 @@ public void commit() { return; } - ValidationUtils.validate(base, pendingValidations); - // if the table UUID is missing, add it here. the UUID will be re-created each // time // this operation retries diff --git a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java index 564451163fcb..991da7cfa433 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java +++ b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java @@ -18,10 +18,8 @@ */ package org.apache.iceberg; -import java.util.List; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SnapshotUtil; @@ -29,12 +27,12 @@ * ToDo: Add SetSnapshotOperation operations such as setCurrentSnapshot, rollBackTime, rollbackTo to * this class so that we can support those operations for refs. */ -class UpdateSnapshotReferencesOperation implements PendingUpdate> { +class UpdateSnapshotReferencesOperation extends BasePendingUpdate> + implements PendingUpdate> { private final TableOperations ops; private final Map updatedRefs; private final TableMetadata base; - private final List pendingValidations = Lists.newArrayList(); UpdateSnapshotReferencesOperation(TableOperations ops) { this.ops = ops; @@ -47,16 +45,10 @@ public Map apply() { return updatedRefs; } - @Override - public void validate(List validations) { - ValidationUtils.validate(base, validations); - pendingValidations.addAll(validations); - } - @Override public void commit() { TableMetadata updated = internalApply(); - ValidationUtils.validate(base, pendingValidations); + validate(base); ops.commit(base, updated); } diff --git a/core/src/test/java/org/apache/iceberg/TestCustomValidations.java b/core/src/test/java/org/apache/iceberg/TestCustomValidations.java index 771d4f16c635..470a77b11584 100644 --- a/core/src/test/java/org/apache/iceberg/TestCustomValidations.java +++ b/core/src/test/java/org/apache/iceberg/TestCustomValidations.java @@ -88,10 +88,9 @@ public void testCherryPickPassesValidation() { .snapshotId(); validateTableFiles(table, FILE_A); - CherryPickOperation cherrypick = - new CherryPickOperation(table.name(), table.operations()).cherrypick(overwriteSnapshotId); - cherrypick.validate(ImmutableList.of(alwaysPassValidation)); - cherrypick.commit(); + new CherryPickOperation(table.name(), table.operations()) + .cherrypick(overwriteSnapshotId) + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.currentSnapshot().snapshotId()).isEqualTo(overwriteSnapshotId); validateTableFiles(table, FILE_B); @@ -111,13 +110,10 @@ public void testCherryPickFailsValidation() { validateTableFiles(table, FILE_A); assertThatThrownBy( - () -> { - CherryPickOperation cherrypick = - new CherryPickOperation(table.name(), table.operations()) - .cherrypick(overwriteSnapshotId); - cherrypick.validate(ImmutableList.of(alwaysFailValidation)); - cherrypick.commit(); - }) + () -> + new CherryPickOperation(table.name(), table.operations()) + .cherrypick(overwriteSnapshotId) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -142,12 +138,11 @@ public void testCherryPickFailsValidationDueToConcurrentCommit() { CherryPickOperation pendingUpdate = new CherryPickOperation(table.name(), table.operations()).cherrypick(overwriteSnapshotId); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -169,13 +164,10 @@ public void testCherryPickFailsDueToIllegalTableModificationInsideValidation() { validateTableFiles(table, FILE_A); assertThatThrownBy( - () -> { - CherryPickOperation cherrypick = - new CherryPickOperation(table.name(), table.operations()) - .cherrypick(overwriteSnapshotId); - cherrypick.validate(ImmutableList.of(illegalValidation)); - cherrypick.commit(); - }) + () -> + new CherryPickOperation(table.name(), table.operations()) + .cherrypick(overwriteSnapshotId) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -188,9 +180,7 @@ public void testDeleteFilesPassesValidation() { table.newFastAppend().appendFile(FILE_A).commit(); validateTableFiles(table, FILE_A); - DeleteFiles deleteFiles = table.newDelete().deleteFile(FILE_A); - deleteFiles.validate(ImmutableList.of(alwaysPassValidation)); - deleteFiles.commit(); + table.newDelete().deleteFile(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); validateTableFiles(table); } @@ -201,11 +191,11 @@ public void testDeleteFilesFailsValidation() { validateTableFiles(table, FILE_A); assertThatThrownBy( - () -> { - DeleteFiles deleteFiles = table.newDelete().deleteFile(FILE_A); - deleteFiles.validate(ImmutableList.of(alwaysFailValidation)); - deleteFiles.commit(); - }) + () -> + table + .newDelete() + .deleteFile(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -220,12 +210,11 @@ public void testDeleteFilesFailsValidationDueToConcurrentCommit() { setWatermarkProperty(table, 0); PendingUpdate pendingUpdate = table.newDelete().deleteFile(FILE_A); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -238,11 +227,8 @@ public void testDeleteFilesFailsDueToIllegalTableModificationInsideValidation() validateTableFiles(table, FILE_A); assertThatThrownBy( - () -> { - DeleteFiles deleteFiles = table.newDelete().deleteFile(FILE_A); - deleteFiles.validate(ImmutableList.of(illegalValidation)); - deleteFiles.commit(); - }) + () -> + table.newDelete().deleteFile(FILE_A).commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -256,13 +242,11 @@ public void testExpireSnapshotsPassesValidation() { table.newAppend().appendFile(FILE_B).commit(); Set deletedFiles = Sets.newHashSet(); - ExpireSnapshots expireSnapshots = - table - .expireSnapshots() - .expireSnapshotId(firstSnapshot.snapshotId()) - .deleteWith(deletedFiles::add); - expireSnapshots.validate(ImmutableList.of(alwaysPassValidation)); - expireSnapshots.commit(); + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add) + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(deletedFiles) .as("Should remove the expired manifest list location") @@ -277,15 +261,12 @@ public void testExpireSnapshotsFailsValidation() { Set deletedFiles = Sets.newHashSet(); assertThatThrownBy( - () -> { - ExpireSnapshots expireSnapshots = - table - .expireSnapshots() - .expireSnapshotId(firstSnapshot.snapshotId()) - .deleteWith(deletedFiles::add); - expireSnapshots.validate(ImmutableList.of(alwaysFailValidation)); - expireSnapshots.commit(); - }) + () -> + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -306,12 +287,11 @@ public void testExpireSnapshotsFailsValidationDueToConcurrentCommit() { .expireSnapshots() .expireSnapshotId(firstSnapshot.snapshotId()) .deleteWith(deletedFiles::add); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -326,15 +306,12 @@ public void testExpireSnapshotsFailsDueToIllegalTableModificationInsideValidatio Set deletedFiles = Sets.newHashSet(); assertThatThrownBy( - () -> { - ExpireSnapshots expireSnapshots = - table - .expireSnapshots() - .expireSnapshotId(firstSnapshot.snapshotId()) - .deleteWith(deletedFiles::add); - expireSnapshots.validate(ImmutableList.of(illegalValidation)); - expireSnapshots.commit(); - }) + () -> + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -345,9 +322,7 @@ public void testExpireSnapshotsFailsDueToIllegalTableModificationInsideValidatio public void testFastAppendPassesValidation() { validateTableFiles(table); - AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_A); - appendFiles.validate(ImmutableList.of(alwaysPassValidation)); - appendFiles.commit(); + table.newFastAppend().appendFile(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); validateTableFiles(table, FILE_A); } @@ -357,11 +332,11 @@ public void testFastAppendFailsValidation() { validateTableFiles(table); assertThatThrownBy( - () -> { - AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_A); - appendFiles.validate(ImmutableList.of(alwaysFailValidation)); - appendFiles.commit(); - }) + () -> + table + .newFastAppend() + .appendFile(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -375,12 +350,11 @@ public void testFastAppendFailsValidationDueToConcurrentCommit() { setWatermarkProperty(table, 0); PendingUpdate pendingUpdate = table.newFastAppend().appendFile(FILE_A); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -392,11 +366,11 @@ public void testFastAppendFailsDueToIllegalTableModificationInsideValidation() { validateTableFiles(table); assertThatThrownBy( - () -> { - AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_A); - appendFiles.validate(ImmutableList.of(illegalValidation)); - appendFiles.commit(); - }) + () -> + table + .newFastAppend() + .appendFile(FILE_A) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -410,9 +384,10 @@ public void testManageSnapshotsPassesValidation() { String tagName = "tag1"; assertThat(table.refs().get(tagName)).isNull(); - ManageSnapshots manageSnapshots = table.manageSnapshots().createTag(tagName, snapshotId); - manageSnapshots.validate(ImmutableList.of(alwaysPassValidation)); - manageSnapshots.commit(); + table + .manageSnapshots() + .createTag(tagName, snapshotId) + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.refs().get(tagName)) .isNotNull() @@ -427,12 +402,11 @@ public void testManageSnapshotsFailsValidation() { assertThat(table.refs().get(tagName)).isNull(); assertThatThrownBy( - () -> { - ManageSnapshots manageSnapshots = - table.manageSnapshots().createTag(tagName, snapshotId); - manageSnapshots.validate(ImmutableList.of(alwaysFailValidation)); - manageSnapshots.commit(); - }) + () -> + table + .manageSnapshots() + .createTag(tagName, snapshotId) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -449,12 +423,11 @@ public void testManageSnapshotsFailsDueToConcurrentCommit() { setWatermarkProperty(table, 0); ManageSnapshots pendingUpdate = table.manageSnapshots().createTag(tagName, snapshotId); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(CommitFailedException.class) .hasMessage("Table metadata refresh is required"); @@ -469,12 +442,11 @@ public void testManageSnapshotsFailsDueToIllegalTableModificationInsideValidatio assertThat(table.refs().get(tagName)).isNull(); assertThatThrownBy( - () -> { - ManageSnapshots manageSnapshots = - table.manageSnapshots().createTag(tagName, snapshotId); - manageSnapshots.validate(ImmutableList.of(illegalValidation)); - manageSnapshots.commit(); - }) + () -> + table + .manageSnapshots() + .createTag(tagName, snapshotId) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -485,9 +457,7 @@ public void testManageSnapshotsFailsDueToIllegalTableModificationInsideValidatio public void testMergeAppendPassesValidation() { validateTableFiles(table); - AppendFiles appendFiles = table.newAppend().appendFile(FILE_A); - appendFiles.validate(ImmutableList.of(alwaysPassValidation)); - appendFiles.commit(); + table.newAppend().appendFile(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); validateTableFiles(table, FILE_A); } @@ -497,11 +467,11 @@ public void testMergeAppendFailsValidation() { validateTableFiles(table); assertThatThrownBy( - () -> { - AppendFiles appendFiles = table.newAppend().appendFile(FILE_A); - appendFiles.validate(ImmutableList.of(alwaysFailValidation)); - appendFiles.commit(); - }) + () -> + table + .newAppend() + .appendFile(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -515,12 +485,11 @@ public void testMergeAppendFailsValidationDueToConcurrentCommit() { setWatermarkProperty(table, 0); AppendFiles pendingUpdate = table.newAppend().appendFile(FILE_A); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -532,11 +501,8 @@ public void testMergeAppendFailsDueToIllegalTableModificationInsideValidation() validateTableFiles(table); assertThatThrownBy( - () -> { - AppendFiles appendFiles = table.newAppend().appendFile(FILE_A); - appendFiles.validate(ImmutableList.of(illegalValidation)); - appendFiles.commit(); - }) + () -> + table.newAppend().appendFile(FILE_A).commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -548,10 +514,11 @@ public void testOverwritePassesValidation() { table.newFastAppend().appendFile(FILE_A).commit(); validateTableFiles(table, FILE_A); - OverwriteFiles overwriteFiles = - table.newOverwrite().overwriteByRowFilter(Expressions.alwaysTrue()).addFile(FILE_B); - overwriteFiles.validate(ImmutableList.of(alwaysPassValidation)); - overwriteFiles.commit(); + table + .newOverwrite() + .overwriteByRowFilter(Expressions.alwaysTrue()) + .addFile(FILE_B) + .commitIf(ImmutableList.of(alwaysPassValidation)); validateTableFiles(table, FILE_B); } @@ -562,15 +529,12 @@ public void testOverwriteFailsValidation() { validateTableFiles(table, FILE_A); assertThatThrownBy( - () -> { - OverwriteFiles overwriteFiles = - table - .newOverwrite() - .overwriteByRowFilter(Expressions.alwaysTrue()) - .addFile(FILE_B); - overwriteFiles.validate(ImmutableList.of(alwaysFailValidation)); - overwriteFiles.commit(); - }) + () -> + table + .newOverwrite() + .overwriteByRowFilter(Expressions.alwaysTrue()) + .addFile(FILE_B) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -586,12 +550,11 @@ public void testOverwriteFailsValidationDueToConcurrentCommit() { OverwriteFiles pendingUpdate = table.newOverwrite().overwriteByRowFilter(Expressions.alwaysTrue()).addFile(FILE_B); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -604,15 +567,12 @@ public void testOverwriteFailsDueToIllegalTableModificationInsideValidation() { validateTableFiles(table, FILE_A); assertThatThrownBy( - () -> { - OverwriteFiles overwriteFiles = - table - .newOverwrite() - .overwriteByRowFilter(Expressions.alwaysTrue()) - .addFile(FILE_B); - overwriteFiles.validate(ImmutableList.of(illegalValidation)); - overwriteFiles.commit(); - }) + () -> + table + .newOverwrite() + .overwriteByRowFilter(Expressions.alwaysTrue()) + .addFile(FILE_B) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -623,9 +583,7 @@ public void testOverwriteFailsDueToIllegalTableModificationInsideValidation() { public void testReplacePartitionsPassesValidation() { validateTableFiles(table); - ReplacePartitions replacePartitions = table.newReplacePartitions().addFile(FILE_A); - replacePartitions.validate(ImmutableList.of(alwaysPassValidation)); - replacePartitions.commit(); + table.newReplacePartitions().addFile(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); validateTableFiles(table, FILE_A); } @@ -635,11 +593,11 @@ public void testReplacePartitionsFailsValidation() { validateTableFiles(table); assertThatThrownBy( - () -> { - ReplacePartitions replacePartitions = table.newReplacePartitions().addFile(FILE_A); - replacePartitions.validate(ImmutableList.of(alwaysFailValidation)); - replacePartitions.commit(); - }) + () -> + table + .newReplacePartitions() + .addFile(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -653,12 +611,11 @@ public void testReplacePartitionsFailsValidationDueToConcurrentCommit() { setWatermarkProperty(table, 0); ReplacePartitions pendingUpdate = table.newReplacePartitions().addFile(FILE_A).addFile(FILE_B); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -670,11 +627,11 @@ public void testReplacePartitionsFailsDueToIllegalTableModificationInsideValidat validateTableFiles(table); assertThatThrownBy( - () -> { - ReplacePartitions replacePartitions = table.newReplacePartitions().addFile(FILE_A); - replacePartitions.validate(ImmutableList.of(illegalValidation)); - replacePartitions.commit(); - }) + () -> + table + .newReplacePartitions() + .addFile(FILE_A) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -685,9 +642,7 @@ public void testReplacePartitionsFailsDueToIllegalTableModificationInsideValidat public void testReplaceSortOrderPassesValidation() { assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); - ReplaceSortOrder replaceSortOrder = table.replaceSortOrder().asc("data"); - replaceSortOrder.validate(ImmutableList.of(alwaysPassValidation)); - replaceSortOrder.commit(); + table.replaceSortOrder().asc("data").commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.sortOrder()) .as("Table should reflect new sort order") @@ -699,11 +654,11 @@ public void testReplaceSortOrderFailsValidation() { assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); assertThatThrownBy( - () -> { - ReplaceSortOrder replaceSortOrder = table.replaceSortOrder().asc("data"); - replaceSortOrder.validate(ImmutableList.of(alwaysFailValidation)); - replaceSortOrder.commit(); - }) + () -> + table + .replaceSortOrder() + .asc("data") + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -717,12 +672,11 @@ public void testReplaceSortOrderFailsValidationDueToConcurrentCommit() { setWatermarkProperty(table, 0); ReplaceSortOrder pendingUpdate = table.replaceSortOrder().asc("data"); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -734,11 +688,8 @@ public void testReplaceSortOrderFailsDueToIllegalTableModificationInsideValidati assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); assertThatThrownBy( - () -> { - ReplaceSortOrder replaceSortOrder = table.replaceSortOrder().asc("data"); - replaceSortOrder.validate(ImmutableList.of(illegalValidation)); - replaceSortOrder.commit(); - }) + () -> + table.replaceSortOrder().asc("data").commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -750,10 +701,10 @@ public void testRewriteFilesPassesValidation() { table.newAppend().appendFile(FILE_A).commit(); validateTableFiles(table, FILE_A); - RewriteFiles rewriteFiles = - table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)); - rewriteFiles.validate(ImmutableList.of(alwaysPassValidation)); - rewriteFiles.commit(); + table + .newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)) + .commitIf(ImmutableList.of(alwaysPassValidation)); validateTableFiles(table, FILE_B); } @@ -764,12 +715,11 @@ public void testRewriteFilesFailsValidation() { validateTableFiles(table, FILE_A); assertThatThrownBy( - () -> { - RewriteFiles rewriteFiles = - table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)); - rewriteFiles.validate(ImmutableList.of(alwaysFailValidation)); - rewriteFiles.commit(); - }) + () -> + table + .newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -785,12 +735,11 @@ public void testRewriteFilesFailsValidationDueToConcurrentCommit() { RewriteFiles pendingUpdate = table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -803,12 +752,11 @@ public void testRewriteFilesFailsDueToIllegalTableModificationInsideValidation() validateTableFiles(table, FILE_A); assertThatThrownBy( - () -> { - RewriteFiles rewriteFiles = - table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)); - rewriteFiles.validate(ImmutableList.of(illegalValidation)); - rewriteFiles.commit(); - }) + () -> + table + .newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -821,9 +769,10 @@ public void testRewriteManifestsPassesValidation() { table.newAppend().appendFile(FILE_B).commit(); assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); - RewriteManifests rewriteManifests = table.rewriteManifests().clusterBy(dataFile -> ""); - rewriteManifests.validate(ImmutableList.of(alwaysPassValidation)); - rewriteManifests.commit(); + table + .rewriteManifests() + .clusterBy(dataFile -> "") + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1); } @@ -835,12 +784,11 @@ public void testRewriteManifestsFailsValidation() { assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); assertThatThrownBy( - () -> { - RewriteManifests rewriteManifests = - table.rewriteManifests().clusterBy(dataFile -> ""); - rewriteManifests.validate(ImmutableList.of(alwaysFailValidation)); - rewriteManifests.commit(); - }) + () -> + table + .rewriteManifests() + .clusterBy(dataFile -> "") + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -856,12 +804,11 @@ public void testRewriteManifestsFailsValidationDueToConcurrentCommit() { setWatermarkProperty(table, 0); RewriteManifests pendingUpdate = table.rewriteManifests().clusterBy(dataFile -> ""); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -875,12 +822,11 @@ public void testRewriteManifestsFailsDueToIllegalTableModificationInsideValidati assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); assertThatThrownBy( - () -> { - RewriteManifests rewriteManifests = - table.rewriteManifests().clusterBy(dataFile -> ""); - rewriteManifests.validate(ImmutableList.of(illegalValidation)); - rewriteManifests.commit(); - }) + () -> + table + .rewriteManifests() + .clusterBy(dataFile -> "") + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -891,9 +837,7 @@ public void testRewriteManifestsFailsDueToIllegalTableModificationInsideValidati public void testRowDeltaPassesValidation() { validateTableFiles(table); - RowDelta rowDelta = table.newRowDelta().addRows(FILE_A); - rowDelta.validate(ImmutableList.of(alwaysPassValidation)); - rowDelta.commit(); + table.newRowDelta().addRows(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); validateTableFiles(table, FILE_A); } @@ -903,11 +847,11 @@ public void testRowDeltaFailsValidation() { validateTableFiles(table); assertThatThrownBy( - () -> { - RowDelta rowDelta = table.newRowDelta().addRows(FILE_A); - rowDelta.validate(ImmutableList.of(alwaysFailValidation)); - rowDelta.commit(); - }) + () -> + table + .newRowDelta() + .addRows(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -921,12 +865,11 @@ public void testRowDeltaFailsValidationDueToConcurrentCommit() { setWatermarkProperty(table, 0); RowDelta pendingUpdate = table.newRowDelta().addRows(FILE_A); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -938,11 +881,7 @@ public void testRowDeltaFailsDueToIllegalTableModificationInsideValidation() { validateTableFiles(table); assertThatThrownBy( - () -> { - RowDelta rowDelta = table.newRowDelta().addRows(FILE_A); - rowDelta.validate(ImmutableList.of(illegalValidation)); - rowDelta.commit(); - }) + () -> table.newRowDelta().addRows(FILE_A).commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -956,10 +895,9 @@ public void testSetSnapshotOperationPassesValidation() { table.newAppend().appendFile(FILE_B).commit(); validateTableFiles(table, FILE_A, FILE_B); - SetSnapshotOperation setSnapshotOperation = - new SetSnapshotOperation(table.operations()).setCurrentSnapshot(firstSnapshotId); - setSnapshotOperation.validate(ImmutableList.of(alwaysPassValidation)); - setSnapshotOperation.commit(); + new SetSnapshotOperation(table.operations()) + .setCurrentSnapshot(firstSnapshotId) + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.currentSnapshot().snapshotId()).isEqualTo(firstSnapshotId); validateTableFiles(table, FILE_A); @@ -974,12 +912,10 @@ public void testSetSnapshotOperationFailsValidation() { validateTableFiles(table, FILE_A, FILE_B); assertThatThrownBy( - () -> { - SetSnapshotOperation setSnapshotOperation = - new SetSnapshotOperation(table.operations()).setCurrentSnapshot(firstSnapshotId); - setSnapshotOperation.validate(ImmutableList.of(alwaysFailValidation)); - setSnapshotOperation.commit(); - }) + () -> + new SetSnapshotOperation(table.operations()) + .setCurrentSnapshot(firstSnapshotId) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -999,12 +935,11 @@ public void testSetSnapshotOperationFailsValidationDueToConcurrentCommit() { SetSnapshotOperation pendingUpdate = new SetSnapshotOperation(table.operations()).setCurrentSnapshot(firstSnapshotId); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -1021,12 +956,10 @@ public void testSetSnapshotOperationFailsDueToIllegalTableModificationInsideVali validateTableFiles(table, FILE_A, FILE_B); assertThatThrownBy( - () -> { - SetSnapshotOperation setSnapshotOperation = - new SetSnapshotOperation(table.operations()).setCurrentSnapshot(firstSnapshotId); - setSnapshotOperation.validate(ImmutableList.of(illegalValidation)); - setSnapshotOperation.commit(); - }) + () -> + new SetSnapshotOperation(table.operations()) + .setCurrentSnapshot(firstSnapshotId) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -1041,11 +974,9 @@ public void testUpdateSnapshotReferencesOperationPassesValidation() { String branchName = "feature-develop"; assertThat(table.ops().refresh().ref(branchName)).isNull(); - UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation = - new UpdateSnapshotReferencesOperation(table.operations()) - .createBranch(branchName, firstSnapshotId); - updateSnapshotReferencesOperation.validate(ImmutableList.of(alwaysPassValidation)); - updateSnapshotReferencesOperation.commit(); + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId) + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.ops().refresh().ref(branchName)) .isEqualTo(SnapshotRef.branchBuilder(firstSnapshotId).build()); @@ -1059,13 +990,10 @@ public void testUpdateSnapshotReferencesOperationFailsValidation() { assertThat(table.ops().refresh().ref(branchName)).isNull(); assertThatThrownBy( - () -> { - UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation = - new UpdateSnapshotReferencesOperation(table.operations()) - .createBranch(branchName, firstSnapshotId); - updateSnapshotReferencesOperation.validate(ImmutableList.of(alwaysFailValidation)); - updateSnapshotReferencesOperation.commit(); - }) + () -> + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -1084,12 +1012,11 @@ public void testUpdateSnapshotReferencesOperationFailsDueToConcurrentCommit() { UpdateSnapshotReferencesOperation pendingUpdate = new UpdateSnapshotReferencesOperation(table.operations()) .createBranch(branchName, firstSnapshotId); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(CommitFailedException.class) .hasMessage("Cannot commit changes based on stale metadata"); @@ -1105,13 +1032,10 @@ public void testUpdateSnapshotReferencesOperationFailsDueToConcurrentCommit() { assertThat(table.ops().refresh().ref(branchName)).isNull(); assertThatThrownBy( - () -> { - UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation = - new UpdateSnapshotReferencesOperation(table.operations()) - .createBranch(branchName, firstSnapshotId); - updateSnapshotReferencesOperation.validate(ImmutableList.of(illegalValidation)); - updateSnapshotReferencesOperation.commit(); - }) + () -> + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -1140,10 +1064,10 @@ public void testUpdateStatisticsPassesValidation() { assertThat(table.statisticsFiles()).isEmpty(); GenericStatisticsFile statisticsFile = genericStatisticsFile(currentSnapshot); - UpdateStatistics updateStatistics = - table.updateStatistics().setStatistics(currentSnapshot.snapshotId(), statisticsFile); - updateStatistics.validate(ImmutableList.of(alwaysPassValidation)); - updateStatistics.commit(); + table + .updateStatistics() + .setStatistics(currentSnapshot.snapshotId(), statisticsFile) + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.statisticsFiles()) .as("Table should have statistics files") @@ -1157,15 +1081,12 @@ public void testUpdateStatisticsFailsValidation() { assertThat(table.statisticsFiles()).isEmpty(); assertThatThrownBy( - () -> { - UpdateStatistics updateStatistics = - table - .updateStatistics() - .setStatistics( - currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)); - updateStatistics.validate(ImmutableList.of(alwaysFailValidation)); - updateStatistics.commit(); - }) + () -> + table + .updateStatistics() + .setStatistics( + currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -1184,12 +1105,11 @@ public void testUpdateStatisticsFailsValidationDueToConcurrentCommit() { table .updateStatistics() .setStatistics(currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -1203,15 +1123,12 @@ public void testUpdateStatisticsFailsDueToIllegalTableModificationInsideValidati assertThat(table.statisticsFiles()).isEmpty(); assertThatThrownBy( - () -> { - UpdateStatistics updateStatistics = - table - .updateStatistics() - .setStatistics( - currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)); - updateStatistics.validate(ImmutableList.of(illegalValidation)); - updateStatistics.commit(); - }) + () -> + table + .updateStatistics() + .setStatistics( + currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -1223,9 +1140,10 @@ public void testUpdateLocationPassesValidation(@TempDir File tempDir) { String newLocation = tempDir.getAbsolutePath(); assertThat(table.location()).isNotEqualTo(newLocation); - UpdateLocation updateLocation = table.updateLocation().setLocation(newLocation); - updateLocation.validate(ImmutableList.of(alwaysPassValidation)); - updateLocation.commit(); + table + .updateLocation() + .setLocation(newLocation) + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.location()).isEqualTo(newLocation); } @@ -1237,11 +1155,11 @@ public void testUpdateLocationFailsValidation(@TempDir File tempDir) { assertThat(originalLocation).isNotEqualTo(newLocation); assertThatThrownBy( - () -> { - UpdateLocation updateLocation = table.updateLocation().setLocation(newLocation); - updateLocation.validate(ImmutableList.of(alwaysFailValidation)); - updateLocation.commit(); - }) + () -> + table + .updateLocation() + .setLocation(newLocation) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -1257,12 +1175,11 @@ public void testUpdateLocationFailsValidationDueToConcurrentCommit(@TempDir File setWatermarkProperty(table, 0); UpdateLocation pendingUpdate = table.updateLocation().setLocation(newLocation); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -1277,11 +1194,11 @@ public void testUpdateLocationFailsDueToIllegalTableModificationInsideValidation assertThat(originalLocation).isNotEqualTo(newLocation); assertThatThrownBy( - () -> { - UpdateLocation updateLocation = table.updateLocation().setLocation(newLocation); - updateLocation.validate(ImmutableList.of(illegalValidation)); - updateLocation.commit(); - }) + () -> + table + .updateLocation() + .setLocation(newLocation) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -1294,9 +1211,7 @@ public void testUpdatePropertiesPassesValidation() { String value = "newValue"; assertThat(table.properties().get(key)).isNull(); - UpdateProperties updateProperties = table.updateProperties().set(key, value); - updateProperties.validate(ImmutableList.of(alwaysPassValidation)); - updateProperties.commit(); + table.updateProperties().set(key, value).commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.properties().get(key)).isEqualTo(value); } @@ -1308,11 +1223,11 @@ public void testUpdatePropertiesFailsValidation() { assertThat(table.properties().get(key)).isNull(); assertThatThrownBy( - () -> { - UpdateProperties updateProperties = table.updateProperties().set(key, value); - updateProperties.validate(ImmutableList.of(alwaysFailValidation)); - updateProperties.commit(); - }) + () -> + table + .updateProperties() + .set(key, value) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -1328,12 +1243,11 @@ public void testUpdatePropertiesFailsValidationDueToConcurrentCommit() { setWatermarkProperty(table, 0); UpdateProperties pendingUpdate = table.updateProperties().set(key, value); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(ValidationException.class) .hasMessage(watermarkFailMessagePattern, 0); @@ -1347,11 +1261,11 @@ public void testUpdatePropertiesFailsDueToIllegalTableModificationInsideValidati assertThat(table.properties().get(key)).isNull(); assertThatThrownBy( - () -> { - UpdateProperties updateProperties = table.updateProperties().set(key, value); - updateProperties.validate(ImmutableList.of(illegalValidation)); - updateProperties.commit(); - }) + () -> + table + .updateProperties() + .set(key, value) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -1366,9 +1280,10 @@ public void testUpdatePropertiesFailsDueToIllegalTableModificationInsideValidati public void testUpdateSchemaPassesValidation() { assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); - UpdateSchema updateSchema = table.updateSchema().addColumn("bool", Types.BooleanType.get()); - updateSchema.validate(ImmutableList.of(alwaysPassValidation)); - updateSchema.commit(); + table + .updateSchema() + .addColumn("bool", Types.BooleanType.get()) + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat( table @@ -1387,12 +1302,11 @@ public void testUpdateSchemaFailsValidation() { assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); assertThatThrownBy( - () -> { - UpdateSchema updateSchema = - table.updateSchema().addColumn("bool", Types.BooleanType.get()); - updateSchema.validate(ImmutableList.of(alwaysFailValidation)); - updateSchema.commit(); - }) + () -> + table + .updateSchema() + .addColumn("bool", Types.BooleanType.get()) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -1406,12 +1320,11 @@ public void testUpdateSchemaFailsDueToConcurrentCommit() { setWatermarkProperty(table, 0); UpdateSchema pendingUpdate = table.updateSchema().addColumn("bool", Types.BooleanType.get()); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(CommitFailedException.class) .hasMessage("Cannot commit changes based on stale metadata"); @@ -1423,12 +1336,11 @@ public void testUpdateSchemaFailsDueToIllegalTableModificationInsideValidation() assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); assertThatThrownBy( - () -> { - UpdateSchema updateSchema = - table.updateSchema().addColumn("bool", Types.BooleanType.get()); - updateSchema.validate(ImmutableList.of(illegalValidation)); - updateSchema.commit(); - }) + () -> + table + .updateSchema() + .addColumn("bool", Types.BooleanType.get()) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); @@ -1445,10 +1357,10 @@ public void testUpdateSchemaFailsDueToIllegalTableModificationInsideValidation() public void testUpdateSpecPassesValidation() { assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); - UpdatePartitionSpec updatePartitionSpec = - table.updateSpec().addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)); - updatePartitionSpec.validate(ImmutableList.of(alwaysPassValidation)); - updatePartitionSpec.commit(); + table + .updateSpec() + .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)) + .commitIf(ImmutableList.of(alwaysPassValidation)); assertThat(table.spec()) .as("Should include new bucket") @@ -1465,14 +1377,11 @@ public void testUpdateSpecFailsValidation() { assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); assertThatThrownBy( - () -> { - UpdatePartitionSpec updatePartitionSpec = - table - .updateSpec() - .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)); - updatePartitionSpec.validate(ImmutableList.of(alwaysFailValidation)); - updatePartitionSpec.commit(); - }) + () -> + table + .updateSpec() + .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)) + .commitIf(ImmutableList.of(alwaysFailValidation))) .isInstanceOf(ValidationException.class) .hasMessage(alwaysFailMessage); @@ -1487,12 +1396,11 @@ public void testUpdateSpecFailsDueToConcurrentCommit() { UpdatePartitionSpec pendingUpdate = table.updateSpec().addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)); - pendingUpdate.validate(ImmutableList.of(watermarkValidation(0))); // concurrent update to the table which advances our watermark value before we're able to commit setWatermarkProperty(table, 1); - assertThatThrownBy(pendingUpdate::commit) + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) .isInstanceOf(CommitFailedException.class) .hasMessage("Cannot commit changes based on stale metadata"); @@ -1504,14 +1412,11 @@ public void testUpdateSpecFailsDueToIllegalTableModificationInsideValidation() { assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); assertThatThrownBy( - () -> { - UpdatePartitionSpec updatePartitionSpec = - table - .updateSpec() - .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)); - updatePartitionSpec.validate(ImmutableList.of(illegalValidation)); - updatePartitionSpec.commit(); - }) + () -> + table + .updateSpec() + .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)) + .commitIf(ImmutableList.of(illegalValidation))) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table"); diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index 2d73912dc8d5..427fd6816947 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -723,10 +723,11 @@ public void testTransactionPassesValidation() { validateTableFiles(table); Transaction transaction = table.newTransaction(); - AppendFiles appendFiles = transaction.newAppend().appendFile(FILE_A); - appendFiles.validate( - ImmutableList.of(new Validation(currentTable -> true, "Custom validation failed."))); - appendFiles.commit(); + transaction + .newAppend() + .appendFile(FILE_A) + .commitIf( + ImmutableList.of(new Validation(currentTable -> true, "Custom validation failed."))); transaction.commitTransaction(); validateTableFiles(table, FILE_A); @@ -738,13 +739,13 @@ public void testTransactionFailsValidation() { Transaction transaction = table.newTransaction(); assertThatThrownBy( - () -> { - AppendFiles appendFiles = transaction.newAppend().appendFile(FILE_A); - appendFiles.validate( - ImmutableList.of( - new Validation(currentTable -> false, "Custom validation failed."))); - appendFiles.commit(); - }, + () -> + transaction + .newAppend() + .appendFile(FILE_A) + .commitIf( + ImmutableList.of( + new Validation(currentTable -> false, "Custom validation failed."))), "Transaction commit should fail") .isInstanceOf(ValidationException.class) .hasMessage("Custom validation failed."); @@ -764,17 +765,17 @@ public void testTransactionFailsValidationDueToConcurrentCommit() { Transaction transaction = table.newTransaction(); transaction.newAppend().appendFile(FILE_A).commit(); - UpdateProperties updateProperties = - transaction.updateProperties().set(watermarkKey, nextWatermarkValue); - updateProperties.validate( - ImmutableList.of( - new Validation( - currentTable -> - Objects.equals( - currentTable.properties().get(watermarkKey), currentWatermarkValue), - "Current watermark value not equal to expected value=%s", - currentWatermarkValue))); - updateProperties.commit(); + transaction + .updateProperties() + .set(watermarkKey, nextWatermarkValue) + .commitIf( + ImmutableList.of( + new Validation( + currentTable -> + Objects.equals( + currentTable.properties().get(watermarkKey), currentWatermarkValue), + "Current watermark value not equal to expected value=%s", + currentWatermarkValue))); // concurrent update to the table which advances our watermark value before we're able to commit table.updateProperties().set(watermarkKey, nextWatermarkValue).commit(); @@ -792,19 +793,20 @@ public void testTransactionFailsDueToIllegalTableModificationInsideValidation() validateTableFiles(table); assertThatThrownBy( - () -> { - AppendFiles appendFiles = table.newTransaction().newAppend().appendFile(FILE_A); - appendFiles.validate( - ImmutableList.of( - new Validation( - currentTable -> { - // illegal action - currentTable.updateProperties().set("key", "value").commit(); - return true; - }, - "Custom validation failed."))); - appendFiles.commit(); - }, + () -> + table + .newTransaction() + .newAppend() + .appendFile(FILE_A) + .commitIf( + ImmutableList.of( + new Validation( + currentTable -> { + // illegal action + currentTable.updateProperties().set("key", "value").commit(); + return true; + }, + "Custom validation failed."))), "Any attempts to modify a table inside a validation should throw an exception") .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot modify a static table");