From be582274a39b15fc25d0423e5e60ebca92b2b614 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 14 Jan 2025 18:17:11 +0800 Subject: [PATCH 1/2] [core] Introduce SnapshotCommit to abstract atomically commit --- .../org/apache/paimon/AbstractFileStore.java | 12 +- .../paimon/catalog/AbstractCatalog.java | 8 +- .../apache/paimon/catalog/CatalogUtils.java | 5 +- .../paimon/catalog/FileSystemCatalog.java | 43 +++++-- .../catalog/RenamingSnapshotCommit.java | 120 ++++++++++++++++++ .../apache/paimon/catalog/SnapshotCommit.java | 37 ++++++ .../org/apache/paimon/jdbc/JdbcCatalog.java | 25 +++- .../paimon/operation/FileStoreCommit.java | 3 - .../paimon/operation/FileStoreCommitImpl.java | 91 +++---------- .../paimon/operation/PartitionExpire.java | 5 - .../org/apache/paimon/rest/RESTCatalog.java | 28 +++- .../apache/paimon/rest/RESTCatalogLoader.java | 3 +- .../rest/RESTSnapshotCommitFactory.java | 54 ++++++++ .../org/apache/paimon/rest/ResourcePaths.java | 4 + .../rest/requests/CommitTableRequest.java | 77 +++++++++++ .../rest/responses/CommitTableResponse.java | 46 +++++++ .../apache/paimon/schema/SchemaManager.java | 16 +-- .../paimon/table/AbstractFileStoreTable.java | 1 - .../paimon/table/CatalogEnvironment.java | 27 ++-- .../paimon/table/sink/TableCommitImpl.java | 9 +- .../paimon/operation/PartitionExpireTest.java | 2 +- .../apache/paimon/rest/RESTCatalogServer.java | 23 ++++ .../org/apache/paimon/hive/HiveCatalog.java | 42 ++++-- .../paimon/hive/HiveCatalogITCaseBase.java | 12 +- paimon-open-api/rest-catalog-open-api.yaml | 69 ++++++++++ .../open/api/RESTCatalogController.java | 26 ++++ 26 files changed, 630 insertions(+), 158 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/rest/responses/CommitTableResponse.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index d2efe5aef566..a6da6d58aa7a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -19,6 +19,8 @@ package org.apache.paimon; import org.apache.paimon.CoreOptions.ExternalPathStrategy; +import org.apache.paimon.catalog.RenamingSnapshotCommit; +import org.apache.paimon.catalog.SnapshotCommit; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; import org.apache.paimon.fs.FileIO; @@ -31,6 +33,7 @@ import org.apache.paimon.metastore.AddPartitionTagCallback; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommitImpl; +import org.apache.paimon.operation.Lock; import org.apache.paimon.operation.ManifestsReader; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.SnapshotDeletion; @@ -261,7 +264,14 @@ public FileStoreCommitImpl newCommit(String commitUser) { @Override public FileStoreCommitImpl newCommit(String commitUser, List callbacks) { + SnapshotManager snapshotManager = snapshotManager(); + SnapshotCommit snapshotCommit = catalogEnvironment.snapshotCommit(snapshotManager); + if (snapshotCommit == null) { + snapshotCommit = + new RenamingSnapshotCommit(snapshotManager, Lock.emptyFactory().create()); + } return new FileStoreCommitImpl( + snapshotCommit, fileIO, schemaManager, tableName, @@ -270,7 +280,7 @@ public FileStoreCommitImpl newCommit(String commitUser, List cal options, options.partitionDefaultName(), pathFactory(), - snapshotManager(), + snapshotManager, manifestFileFactory(), manifestListFactory(), indexManifestFileFactory(), diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index f766df0c5caf..aa93b1ba3256 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -24,7 +24,6 @@ import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; @@ -367,9 +366,10 @@ protected abstract void alterTableImpl(Identifier identifier, List @Override public Table getTable(Identifier identifier) throws TableNotExistException { - Lock.Factory lockFactory = - Lock.factory(lockFactory().orElse(null), lockContext().orElse(null), identifier); - return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, lockFactory); + SnapshotCommit.Factory commitFactory = + new RenamingSnapshotCommit.Factory( + lockFactory().orElse(null), lockContext().orElse(null)); + return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, commitFactory); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index 3ceef461e873..fbd510692c30 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -23,7 +23,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; -import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.SchemaManager; @@ -173,7 +172,7 @@ public static Table loadTable( Catalog catalog, Identifier identifier, TableMetadata.Loader metadataLoader, - Lock.Factory lockFactory) + SnapshotCommit.Factory commitFactory) throws Catalog.TableNotExistException { if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) { return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog); @@ -188,7 +187,7 @@ public static Table loadTable( CatalogEnvironment catalogEnv = new CatalogEnvironment( - identifier, metadata.uuid(), lockFactory, catalog.catalogLoader()); + identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory); Path path = new Path(schema.options().get(PATH.key())); FileStoreTable table = FileStoreTableFactory.create(catalog.fileIO(), path, schema, catalogEnv); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index f8e9a2aabc4d..38b1214829f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; @@ -117,20 +118,30 @@ protected void dropTableImpl(Identifier identifier) { @Override public void createTableImpl(Identifier identifier, Schema schema) { - uncheck(() -> schemaManager(identifier).createTable(schema)); + SchemaManager schemaManager = schemaManager(identifier); + try { + runWithLock(identifier, () -> uncheck(() -> schemaManager.createTable(schema))); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } } - private SchemaManager schemaManager(Identifier identifier) { - Path path = getTableLocation(identifier); - CatalogLock catalogLock = - lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null); - return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault()) - .withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier)); + public T runWithLock(Identifier identifier, Callable callable) throws Exception { + Optional lockFactory = lockFactory(); + try (Lock lock = + lockFactory + .map(factory -> factory.createLock(lockContext().orElse(null))) + .map(l -> Lock.fromCatalog(l, identifier)) + .orElseGet(() -> Lock.emptyFactory().create())) { + return lock.runWithLock(callable); + } } - private CatalogLockContext assertGetLockContext() { - return lockContext() - .orElseThrow(() -> new RuntimeException("No lock context when lock is enabled.")); + private SchemaManager schemaManager(Identifier identifier) { + Path path = getTableLocation(identifier); + return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault()); } @Override @@ -143,7 +154,17 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) { @Override protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - schemaManager(identifier).commitChanges(changes); + SchemaManager schemaManager = schemaManager(identifier); + try { + runWithLock(identifier, () -> schemaManager.commitChanges(changes)); + } catch (TableNotExistException + | ColumnAlreadyExistException + | ColumnNotExistException + | RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } } protected static T uncheck(Callable callable) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java new file mode 100644 index 000000000000..40411b9ac1ad --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.operation.Lock; +import org.apache.paimon.utils.SnapshotManager; + +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.concurrent.Callable; + +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; + +/** + * A {@link SnapshotCommit} using file renaming to commit. + * + *

Note that when the file system is local or HDFS, rename is atomic. But if the file system is + * object storage, we need additional lock protection. + */ +public class RenamingSnapshotCommit implements SnapshotCommit { + + private final SnapshotManager snapshotManager; + private final FileIO fileIO; + private final Lock lock; + + public RenamingSnapshotCommit(SnapshotManager snapshotManager, Lock lock) { + this.snapshotManager = snapshotManager; + this.fileIO = snapshotManager.fileIO(); + this.lock = lock; + } + + @Override + public boolean commit(Snapshot snapshot, @Nullable String branch) throws Exception { + Path newSnapshotPath = + branch == null || branch.equals(DEFAULT_MAIN_BRANCH) + ? snapshotManager.snapshotPath(snapshot.id()) + : snapshotManager.copyWithBranch(branch).snapshotPath(snapshot.id()); + + Callable callable = + () -> { + boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson()); + if (committed) { + snapshotManager.commitLatestHint(snapshot.id()); + } + return committed; + }; + return lock.runWithLock( + () -> + // fs.rename may not returns false if target file + // already exists, or even not atomic + // as we're relying on external locking, we can first + // check if file exist then rename to work around this + // case + !fileIO.exists(newSnapshotPath) && callable.call()); + } + + @Override + public void close() throws Exception { + this.lock.close(); + } + + /** Factory to create {@link RenamingSnapshotCommit}. */ + public static class Factory implements SnapshotCommit.Factory { + + private static final long serialVersionUID = 1L; + + @Nullable private final CatalogLockFactory lockFactory; + @Nullable private final CatalogLockContext lockContext; + + public Factory( + @Nullable CatalogLockFactory lockFactory, + @Nullable CatalogLockContext lockContext) { + this.lockFactory = lockFactory; + this.lockContext = lockContext; + } + + @Override + public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager) { + Lock lock = + Optional.ofNullable(lockFactory) + .map(factory -> factory.createLock(lockContext)) + .map(l -> Lock.fromCatalog(l, identifier)) + .orElseGet(() -> Lock.emptyFactory().create()); + return new RenamingSnapshotCommit(snapshotManager, lock); + } + + @VisibleForTesting + @Nullable + public CatalogLockFactory lockFactory() { + return lockFactory; + } + + @VisibleForTesting + @Nullable + public CatalogLockContext lockContext() { + return lockContext; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java b/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java new file mode 100644 index 000000000000..a9b7864956ed --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.utils.SnapshotManager; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** Interface to commit snapshot atomically. */ +public interface SnapshotCommit extends AutoCloseable { + + boolean commit(Snapshot snapshot, @Nullable String branch) throws Exception; + + /** Factory to create {@link SnapshotCommit}. */ + interface Factory extends Serializable { + SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 8db9e723c295..1a5bd7d12569 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -57,6 +57,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; @@ -289,7 +290,8 @@ protected void dropTableImpl(Identifier identifier) { protected void createTableImpl(Identifier identifier, Schema schema) { try { // create table file - getSchemaManager(identifier).createTable(schema); + SchemaManager schemaManager = getSchemaManager(identifier); + runWithLock(identifier, () -> schemaManager.createTable(schema)); // Update schema metadata Path path = getTableLocation(identifier); int insertRecord = @@ -350,10 +352,19 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { @Override protected void alterTableImpl(Identifier identifier, List changes) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + throws ColumnAlreadyExistException, TableNotExistException, ColumnNotExistException { assertMainBranch(identifier); SchemaManager schemaManager = getSchemaManager(identifier); - schemaManager.commitChanges(changes); + try { + runWithLock(identifier, () -> schemaManager.commitChanges(changes)); + } catch (TableNotExistException + | ColumnAlreadyExistException + | ColumnNotExistException + | RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Failed to alter table " + identifier.getFullName(), e); + } } @Override @@ -384,9 +395,9 @@ public Optional lockContext() { return Optional.of(new JdbcCatalogLockContext(catalogKey, options)); } - private Lock lock(Identifier identifier) { + public T runWithLock(Identifier identifier, Callable callable) throws Exception { if (!lockEnabled()) { - return new Lock.EmptyLock(); + return callable.call(); } JdbcCatalogLock lock = new JdbcCatalogLock( @@ -394,7 +405,7 @@ private Lock lock(Identifier identifier) { catalogKey, checkMaxSleep(options.toMap()), acquireTimeout(options.toMap())); - return Lock.fromCatalog(lock, identifier); + return Lock.fromCatalog(lock, identifier).runWithLock(callable); } @Override @@ -403,7 +414,7 @@ public void close() throws Exception { } private SchemaManager getSchemaManager(Identifier identifier) { - return new SchemaManager(fileIO, getTableLocation(identifier)).withLock(lock(identifier)); + return new SchemaManager(fileIO, getTableLocation(identifier)); } private Map fetchProperties(String databaseName) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index 43456cbe7184..7d87aff87a1f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -32,9 +32,6 @@ /** Commit operation which provides commit and overwrite. */ public interface FileStoreCommit extends AutoCloseable { - /** With global lock. */ - FileStoreCommit withLock(Lock lock); - FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit); FileStoreCommit withPartitionExpire(PartitionExpire partitionExpire); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 0b4783b16579..7a264a41bf11 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -21,10 +21,10 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.catalog.SnapshotCommit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.FileEntry; @@ -75,7 +75,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; @@ -85,7 +84,6 @@ import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete; import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString; /** @@ -94,12 +92,12 @@ *

This class provides an atomic commit method to the user. * *

    - *
  1. Before calling {@link FileStoreCommitImpl#commit}, if user cannot determine if this commit - * is done before, user should first call {@link FileStoreCommitImpl#filterCommitted}. + *
  2. Before calling {@link #commit(ManifestCommittable, Map)}, if user cannot determine if this + * commit is done before, user should first call {@link #filterCommitted}. *
  3. Before committing, it will first check for conflicts by checking if all files to be removed * currently exists, and if modified files have overlapping key ranges with existing files. - *
  4. After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic - * rename of the file system to ensure atomicity. + *
  5. After that it use the external {@link SnapshotCommit} (if provided) or the atomic rename of + * the file system to ensure atomicity. *
  6. If commit fails due to conflicts or exception it tries its best to clean up and aborts. *
  7. If atomic rename fails it tries again after reading the latest snapshot from step 2. *
@@ -112,6 +110,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class); + private final SnapshotCommit snapshotCommit; private final FileIO fileIO; private final SchemaManager schemaManager; private final String tableName; @@ -135,15 +134,15 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final List commitCallbacks; private final StatsFileHandler statsFileHandler; private final BucketMode bucketMode; - private long commitTimeout; + private final long commitTimeout; private final int commitMaxRetries; - @Nullable private Lock lock; private boolean ignoreEmptyCommit; private CommitMetrics commitMetrics; @Nullable private PartitionExpire partitionExpire; public FileStoreCommitImpl( + SnapshotCommit snapshotCommit, FileIO fileIO, SchemaManager schemaManager, String tableName, @@ -170,6 +169,7 @@ public FileStoreCommitImpl( List commitCallbacks, int commitMaxRetries, long commitTimeout) { + this.snapshotCommit = snapshotCommit; this.fileIO = fileIO; this.schemaManager = schemaManager; this.tableName = tableName; @@ -198,19 +198,12 @@ public FileStoreCommitImpl( this.commitMaxRetries = commitMaxRetries; this.commitTimeout = commitTimeout; - this.lock = null; this.ignoreEmptyCommit = true; this.commitMetrics = null; this.statsFileHandler = statsFileHandler; this.bucketMode = bucketMode; } - @Override - public FileStoreCommit withLock(Lock lock) { - this.lock = lock; - return this; - } - @Override public FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit) { this.ignoreEmptyCommit = ignoreEmptyCommit; @@ -846,10 +839,6 @@ CommitResult tryCommitOnce( long startMillis = System.currentTimeMillis(); long newSnapshotId = latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1; - Path newSnapshotPath = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotManager.snapshotPath(newSnapshotId) - : snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshotId); if (LOG.isDebugEnabled()) { LOG.debug("Ready to commit table files to snapshot {}", newSnapshotId); @@ -1016,27 +1005,19 @@ CommitResult tryCommitOnce( cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests); throw new RuntimeException( String.format( - "Exception occurs when preparing snapshot #%d (path %s) by user %s " + "Exception occurs when preparing snapshot #%d by user %s " + "with hash %s and kind %s. Clean up.", - newSnapshotId, - newSnapshotPath.toString(), - commitUser, - identifier, - commitKind.name()), + newSnapshotId, commitUser, identifier, commitKind.name()), e); } - if (commitSnapshotImpl(newSnapshot, newSnapshotPath)) { + if (commitSnapshotImpl(newSnapshot)) { if (LOG.isDebugEnabled()) { LOG.debug( String.format( - "Successfully commit snapshot #%d (path %s) by user %s " + "Successfully commit snapshot #%d by user %s " + "with identifier %s and kind %s.", - newSnapshotId, - newSnapshotPath, - commitUser, - identifier, - commitKind.name())); + newSnapshotId, commitUser, identifier, commitKind.name())); } commitCallbacks.forEach(callback -> callback.call(deltaFiles, newSnapshot)); return new SuccessResult(); @@ -1046,15 +1027,10 @@ CommitResult tryCommitOnce( long commitTime = (System.currentTimeMillis() - startMillis) / 1000; LOG.warn( String.format( - "Atomic commit failed for snapshot #%d (path %s) by user %s " + "Atomic commit failed for snapshot #%d by user %s " + "with identifier %s and kind %s after %s seconds. " + "Clean up and try again.", - newSnapshotId, - newSnapshotPath, - commitUser, - identifier, - commitKind.name(), - commitTime)); + newSnapshotId, commitUser, identifier, commitKind.name(), commitTime)); cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests); return new RetryResult( deltaManifestList, @@ -1163,12 +1139,7 @@ private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult la latestSnapshot.watermark(), latestSnapshot.statistics()); - Path newSnapshotPath = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotManager.snapshotPath(newSnapshot.id()) - : snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshot.id()); - - if (!commitSnapshotImpl(newSnapshot, newSnapshotPath)) { + if (!commitSnapshotImpl(newSnapshot)) { return new ManifestCompactResult( baseManifestList, deltaManifestList, mergeBeforeManifests, mergeAfterManifests); } else { @@ -1176,39 +1147,18 @@ private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult la } } - private boolean commitSnapshotImpl(Snapshot newSnapshot, Path newSnapshotPath) { + private boolean commitSnapshotImpl(Snapshot newSnapshot) { try { - Callable callable = - () -> { - boolean committed = - fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson()); - if (committed) { - snapshotManager.commitLatestHint(newSnapshot.id()); - } - return committed; - }; - if (lock != null) { - return lock.runWithLock( - () -> - // fs.rename may not returns false if target file - // already exists, or even not atomic - // as we're relying on external locking, we can first - // check if file exist then rename to work around this - // case - !fileIO.exists(newSnapshotPath) && callable.call()); - } else { - return callable.call(); - } + return snapshotCommit.commit(newSnapshot, branchName); } catch (Throwable e) { // exception when performing the atomic rename, // we cannot clean up because we can't determine the success throw new RuntimeException( String.format( - "Exception occurs when committing snapshot #%d (path %s) by user %s " + "Exception occurs when committing snapshot #%d by user %s " + "with identifier %s and kind %s. " + "Cannot clean up because we can't determine the success.", newSnapshot.id(), - newSnapshotPath, commitUser, newSnapshot.commitIdentifier(), newSnapshot.commitKind().name()), @@ -1505,6 +1455,7 @@ public void close() { for (CommitCallback callback : commitCallbacks) { IOUtils.closeQuietly(callback); } + IOUtils.closeQuietly(snapshotCommit); } private static class LevelIdentifier { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index e4d7352f6966..082572d5c45e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -95,11 +95,6 @@ public PartitionExpire( maxExpireNum); } - public PartitionExpire withLock(Lock lock) { - this.commit.withLock(lock); - return this; - } - public PartitionExpire withMaxExpireNum(int maxExpireNum) { this.maxExpireNum = maxExpireNum; return this; diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 97f9ffe56794..61a010aa07bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -18,9 +18,9 @@ package org.apache.paimon.rest; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; @@ -29,7 +29,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; @@ -42,6 +41,7 @@ import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterPartitionsRequest; import org.apache.paimon.rest.requests.AlterTableRequest; +import org.apache.paimon.rest.requests.CommitTableRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; import org.apache.paimon.rest.requests.CreatePartitionsRequest; import org.apache.paimon.rest.requests.CreateTableRequest; @@ -49,6 +49,7 @@ import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; +import org.apache.paimon.rest.responses.CommitTableResponse; import org.apache.paimon.rest.responses.ConfigResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponseResourceType; @@ -70,6 +71,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -154,7 +157,7 @@ public Map options() { } @Override - public CatalogLoader catalogLoader() { + public RESTCatalogLoader catalogLoader() { return new RESTCatalogLoader(options, fileIO); } @@ -278,9 +281,24 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep @Override public Table getTable(Identifier identifier) throws TableNotExistException { - // TODO add lock from server return CatalogUtils.loadTable( - this, identifier, this::loadTableMetadata, Lock.emptyFactory()); + this, + identifier, + this::loadTableMetadata, + new RESTSnapshotCommitFactory(catalogLoader())); + } + + public boolean commitSnapshot( + Identifier identifier, Snapshot snapshot, @Nullable String branch) { + CommitTableRequest request = new CommitTableRequest(identifier, snapshot, branch); + CommitTableResponse response = + client.post( + resourcePaths.commitTable( + identifier.getDatabaseName(), identifier.getTableName()), + request, + CommitTableResponse.class, + headers()); + return response.isSuccess(); } private TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java index b4c5171ab91f..f90988d05cb2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java @@ -18,7 +18,6 @@ package org.apache.paimon.rest; -import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.fs.FileIO; import org.apache.paimon.options.Options; @@ -37,7 +36,7 @@ public RESTCatalogLoader(Options options, FileIO fileIO) { } @Override - public Catalog load() { + public RESTCatalog load() { return new RESTCatalog(options, fileIO); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java new file mode 100644 index 000000000000..51e38b13e47f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.SnapshotCommit; +import org.apache.paimon.utils.SnapshotManager; + +import javax.annotation.Nullable; + +/** Factory to create {@link SnapshotCommit} for REST Catalog. */ +public class RESTSnapshotCommitFactory implements SnapshotCommit.Factory { + + private static final long serialVersionUID = 1L; + + private final RESTCatalogLoader loader; + + public RESTSnapshotCommitFactory(RESTCatalogLoader loader) { + this.loader = loader; + } + + @Override + public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager) { + RESTCatalog catalog = loader.load(); + return new SnapshotCommit() { + @Override + public boolean commit(Snapshot snapshot, @Nullable String branch) { + return catalog.commitSnapshot(identifier, snapshot, branch); + } + + @Override + public void close() throws Exception { + catalog.close(); + } + }; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java index 7b092196626f..a41dad25df56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -66,6 +66,10 @@ public String renameTable(String databaseName, String tableName) { return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "rename"); } + public String commitTable(String databaseName, String tableName) { + return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "commit"); + } + public String partitions(String databaseName, String tableName) { return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions"); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java new file mode 100644 index 000000000000..2dca5672bc8a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.requests; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.rest.RESTRequest; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +/** Request for committing snapshot to table. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class CommitTableRequest implements RESTRequest { + + private static final String FIELD_IDENTIFIER = "identifier"; + private static final String FIELD_SNAPSHOT = "snapshot"; + private static final String FIELD_BRANCH = "branch"; + + @JsonProperty(FIELD_IDENTIFIER) + private final Identifier identifier; + + @JsonProperty(FIELD_SNAPSHOT) + private final Snapshot snapshot; + + @JsonProperty(FIELD_BRANCH) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + private final String branch; + + @JsonCreator + public CommitTableRequest( + @JsonProperty(FIELD_IDENTIFIER) Identifier identifier, + @JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot, + @JsonProperty(FIELD_BRANCH) @Nullable String branch) { + this.identifier = identifier; + this.snapshot = snapshot; + this.branch = branch; + } + + @JsonGetter(FIELD_IDENTIFIER) + public Identifier getIdentifier() { + return identifier; + } + + @JsonGetter(FIELD_SNAPSHOT) + public Snapshot getSnapshot() { + return snapshot; + } + + @JsonGetter(FIELD_BRANCH) + @Nullable + public String getBranch() { + return branch; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/responses/CommitTableResponse.java b/paimon-core/src/main/java/org/apache/paimon/rest/responses/CommitTableResponse.java new file mode 100644 index 000000000000..531b90aa58ba --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/responses/CommitTableResponse.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.responses; + +import org.apache.paimon.rest.RESTResponse; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Response for committing snapshot to table. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class CommitTableResponse implements RESTResponse { + + private static final String FIELD_SUCCESS = "success"; + + @JsonProperty(FIELD_SUCCESS) + private final boolean success; + + @JsonCreator + public CommitTableResponse(@JsonProperty(FIELD_SUCCESS) boolean success) { + this.success = success; + } + + @JsonGetter(FIELD_SUCCESS) + public boolean isSuccess() { + return success; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index bf75abb7c46d..753bc34d95ef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -25,7 +25,6 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.operation.Lock; import org.apache.paimon.schema.SchemaChange.AddColumn; import org.apache.paimon.schema.SchemaChange.DropColumn; import org.apache.paimon.schema.SchemaChange.RemoveOption; @@ -67,7 +66,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; @@ -91,8 +89,6 @@ public class SchemaManager implements Serializable { private final FileIO fileIO; private final Path tableRoot; - @Nullable private transient Lock lock; - private final String branch; public SchemaManager(FileIO fileIO, Path tableRoot) { @@ -110,11 +106,6 @@ public SchemaManager copyWithBranch(String branchName) { return new SchemaManager(fileIO, tableRoot, branchName); } - public SchemaManager withLock(@Nullable Lock lock) { - this.lock = lock; - return this; - } - public Optional latest() { try { return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) @@ -782,12 +773,7 @@ boolean commit(TableSchema newSchema) throws Exception { SchemaValidation.validateTableSchema(newSchema); SchemaValidation.validateFallbackBranch(this, newSchema); Path schemaPath = toSchemaPath(newSchema.id()); - Callable callable = - () -> fileIO.tryToWriteAtomic(schemaPath, newSchema.toString()); - if (lock == null) { - return callable.call(); - } - return lock.runWithLock(callable); + return fileIO.tryToWriteAtomic(schemaPath, newSchema.toString()); } /** Read schema for schema id. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index ea10e824d505..b23e50de19dc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -451,7 +451,6 @@ public TableCommitImpl newCommit(String commitUser) { snapshotExpire, options.writeOnly() ? null : store().newPartitionExpire(commitUser), options.writeOnly() ? null : store().newTagCreationManager(), - catalogEnvironment.lockFactory().create(), CoreOptions.fromMap(options()).consumerExpireTime(), new ConsumerManager(fileIO, path, snapshotManager().branch()), options.snapshotExpireExecutionMode(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java index 855ccf934695..9ae7a8b02876 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java @@ -18,9 +18,11 @@ package org.apache.paimon.table; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.operation.Lock; +import org.apache.paimon.catalog.SnapshotCommit; +import org.apache.paimon.utils.SnapshotManager; import javax.annotation.Nullable; @@ -33,22 +35,22 @@ public class CatalogEnvironment implements Serializable { @Nullable private final Identifier identifier; @Nullable private final String uuid; - private final Lock.Factory lockFactory; @Nullable private final CatalogLoader catalogLoader; + @Nullable private final SnapshotCommit.Factory commitFactory; public CatalogEnvironment( @Nullable Identifier identifier, @Nullable String uuid, - Lock.Factory lockFactory, - @Nullable CatalogLoader catalogLoader) { + @Nullable CatalogLoader catalogLoader, + @Nullable SnapshotCommit.Factory commitFactory) { this.identifier = identifier; this.uuid = uuid; - this.lockFactory = lockFactory; this.catalogLoader = catalogLoader; + this.commitFactory = commitFactory; } public static CatalogEnvironment empty() { - return new CatalogEnvironment(null, null, Lock.emptyFactory(), null); + return new CatalogEnvironment(null, null, null, null); } @Nullable @@ -61,8 +63,12 @@ public String uuid() { return uuid; } - public Lock.Factory lockFactory() { - return lockFactory; + @Nullable + public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) { + if (commitFactory == null) { + return null; + } + return commitFactory.create(identifier, snapshotManager); } @Nullable @@ -72,4 +78,9 @@ public PartitionHandler partitionHandler() { } return PartitionHandler.create(catalogLoader.load(), identifier); } + + @VisibleForTesting + public SnapshotCommit.Factory commitFactory() { + return commitFactory; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 73c55942a56a..9f965892f419 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -28,12 +28,10 @@ import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.operation.Lock; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.utils.ExecutorThreadFactory; -import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.PathFactory; @@ -72,13 +70,13 @@ /** An abstraction layer above {@link FileStoreCommit} to provide snapshot commit and expiration. */ public class TableCommitImpl implements InnerTableCommit { + private static final Logger LOG = LoggerFactory.getLogger(TableCommitImpl.class); private final FileStoreCommit commit; @Nullable private final Runnable expireSnapshots; @Nullable private final PartitionExpire partitionExpire; @Nullable private final TagAutoManager tagAutoManager; - private final Lock lock; @Nullable private final Duration consumerExpireTime; private final ConsumerManager consumerManager; @@ -97,15 +95,12 @@ public TableCommitImpl( @Nullable Runnable expireSnapshots, @Nullable PartitionExpire partitionExpire, @Nullable TagAutoManager tagAutoManager, - Lock lock, @Nullable Duration consumerExpireTime, ConsumerManager consumerManager, ExpireExecutionMode expireExecutionMode, String tableName, boolean forceCreatingSnapshot) { - commit.withLock(lock); if (partitionExpire != null) { - partitionExpire.withLock(lock); commit.withPartitionExpire(partitionExpire); } @@ -113,7 +108,6 @@ public TableCommitImpl( this.expireSnapshots = expireSnapshots; this.partitionExpire = partitionExpire; this.tagAutoManager = tagAutoManager; - this.lock = lock; this.consumerExpireTime = consumerExpireTime; this.consumerManager = consumerManager; @@ -355,7 +349,6 @@ public void expireSnapshots() { @Override public void close() throws Exception { commit.close(); - IOUtils.closeQuietly(lock); expireMainExecutor.shutdownNow(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java index 8e7679a5ade0..74d79d452aef 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java @@ -133,7 +133,7 @@ public void close() throws Exception {} }; CatalogEnvironment env = - new CatalogEnvironment(null, null, Lock.emptyFactory(), null) { + new CatalogEnvironment(null, null, null, null) { @Override public PartitionHandler partitionHandler() { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 1c802cf4c0ef..6ee90f0f7e5b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -22,17 +22,21 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.RenamingSnapshotCommit; +import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.requests.AlterPartitionsRequest; import org.apache.paimon.rest.requests.AlterTableRequest; +import org.apache.paimon.rest.requests.CommitTableRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; import org.apache.paimon.rest.requests.CreatePartitionsRequest; import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.requests.DropPartitionsRequest; import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; import org.apache.paimon.rest.requests.RenameTableRequest; +import org.apache.paimon.rest.responses.CommitTableResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.ErrorResponseResourceType; @@ -120,6 +124,8 @@ public MockResponse dispatch(RecordedRequest request) { boolean isTable = resources.length == 3 && "tables".equals(resources[1]); boolean isTableRename = resources.length == 4 && "rename".equals(resources[3]); + boolean isTableCommit = + resources.length == 4 && "commit".equals(resources[3]); boolean isPartitions = resources.length == 4 && "tables".equals(resources[1]) @@ -176,6 +182,9 @@ public MockResponse dispatch(RecordedRequest request) { } else if (isTableRename) { return renameTableApiHandler( catalog, request, databaseName, resources[2]); + } else if (isTableCommit) { + return commitTableApiHandler( + catalog, request, databaseName, resources[2]); } else if (isTable) { String tableName = resources[2]; return tableApiHandler(catalog, request, databaseName, tableName); @@ -271,6 +280,20 @@ private static MockResponse renameTableApiHandler( return mockResponse(response, 200); } + private static MockResponse commitTableApiHandler( + Catalog catalog, RecordedRequest request, String databaseName, String tableName) + throws Exception { + CommitTableRequest requestBody = + OBJECT_MAPPER.readValue(request.getBody().readUtf8(), CommitTableRequest.class); + FileStoreTable table = + (FileStoreTable) catalog.getTable(Identifier.create(databaseName, tableName)); + RenamingSnapshotCommit commit = + new RenamingSnapshotCommit(table.snapshotManager(), Lock.emptyFactory().create()); + boolean success = commit.commit(requestBody.getSnapshot(), requestBody.getBranch()); + CommitTableResponse response = new CommitTableResponse(success); + return mockResponse(response, 200); + } + private static MockResponse databasesApiHandler(Catalog catalog, RecordedRequest request) throws Exception { RESTResponse response; diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 438c6971aa8e..0df07014bd26 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -92,6 +92,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.function.Function; import java.util.stream.Collectors; @@ -107,8 +108,6 @@ import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; -import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; -import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; @@ -935,7 +934,14 @@ protected void createTableImpl(Identifier identifier, Schema schema) { boolean externalTable = pair.getRight(); TableSchema tableSchema; try { - tableSchema = schemaManager(identifier, location).createTable(schema, externalTable); + tableSchema = + runWithLock( + identifier, + () -> + schemaManager(identifier, location) + .createTable(schema, externalTable)); + } catch (RuntimeException e) { + throw e; } catch (Exception e) { throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); } @@ -1068,9 +1074,19 @@ protected void alterTableImpl(Identifier identifier, List changes) throw new UnsupportedOperationException("Only data table support alter table."); } - final SchemaManager schemaManager = schemaManager(identifier, getTableLocation(identifier)); - // first commit changes to underlying files - TableSchema schema = schemaManager.commitChanges(changes); + SchemaManager schemaManager = schemaManager(identifier, getTableLocation(identifier)); + TableSchema schema; + try { + // first commit changes to underlying files + schema = runWithLock(identifier, () -> schemaManager.commitChanges(changes)); + } catch (TableNotExistException + | ColumnAlreadyExistException + | ColumnNotExistException + | RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Failed to alter table " + identifier.getFullName(), e); + } // currently only changes to main branch affects metastore if (!DEFAULT_MAIN_BRANCH.equals(identifier.getBranchNameOrDefault())) { @@ -1474,18 +1490,20 @@ private FieldSchema convertToFieldSchema(DataField dataField) { } private SchemaManager schemaManager(Identifier identifier, Path location) { - return new SchemaManager(fileIO, location, identifier.getBranchNameOrDefault()) - .withLock(lock(identifier)); + return new SchemaManager(fileIO, location, identifier.getBranchNameOrDefault()); } - private Lock lock(Identifier identifier) { + public T runWithLock(Identifier identifier, Callable callable) throws Exception { if (!lockEnabled()) { - return new Lock.EmptyLock(); + return callable.call(); } HiveCatalogLock lock = - new HiveCatalogLock(clients, checkMaxSleep(hiveConf), acquireTimeout(hiveConf)); - return Lock.fromCatalog(lock, identifier); + new HiveCatalogLock( + clients, + HiveCatalogLock.checkMaxSleep(hiveConf), + HiveCatalogLock.acquireTimeout(hiveConf)); + return Lock.fromCatalog(lock, identifier).runWithLock(callable); } public static HiveConf createHiveConf( diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index e890d0ebf2ad..a9e6c01e39ed 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.DelegateCatalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.RenamingSnapshotCommit; import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.hive.annotation.Minio; import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner; @@ -1098,8 +1099,11 @@ public void testHiveLock() throws InterruptedException, Catalog.TableNotExistExc tEnv.executeSql("CREATE TABLE t (a INT)"); Catalog catalog = ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog(); - FileStoreTable table = (FileStoreTable) catalog.getTable(new Identifier("test_db", "t")); + Identifier identifier = new Identifier("test_db", "t"); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); CatalogEnvironment catalogEnv = table.catalogEnvironment(); + RenamingSnapshotCommit.Factory factory = + (RenamingSnapshotCommit.Factory) catalogEnv.commitFactory(); AtomicInteger count = new AtomicInteger(0); List threads = new ArrayList<>(); @@ -1114,7 +1118,11 @@ public void testHiveLock() throws InterruptedException, Catalog.TableNotExistExc Thread thread = new Thread( () -> { - Lock lock = catalogEnv.lockFactory().create(); + Lock lock = + Lock.fromCatalog( + factory.lockFactory() + .createLock(factory.lockContext()), + identifier); for (int j = 0; j < 10; j++) { try { lock.runWithLock(unsafeIncrement); diff --git a/paimon-open-api/rest-catalog-open-api.yaml b/paimon-open-api/rest-catalog-open-api.yaml index 41d2632454db..a42313266ebd 100644 --- a/paimon-open-api/rest-catalog-open-api.yaml +++ b/paimon-open-api/rest-catalog-open-api.yaml @@ -406,6 +406,54 @@ paths: $ref: '#/components/schemas/ErrorResponse' "500": description: Internal Server Error + /v1/{prefix}/databases/{database}/tables/{table}/commit: + post: + tags: + - table + summary: Commit table + operationId: commitTable + parameters: + - name: prefix + in: path + required: true + schema: + type: string + - name: database + in: path + required: true + schema: + type: string + - name: table + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CommitTableRequest' + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/CommitTableResponse' + "404": + description: Resource not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + "409": + description: Resource has exist + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + "500": + description: Internal Server Error /v1/{prefix}/databases/{database}/tables/{table}/partitions: get: tags: @@ -900,6 +948,27 @@ components: properties: newIdentifier: $ref: '#/components/schemas/Identifier' + CommitTableRequest: + type: object + properties: + identifier: + $ref: '#/components/schemas/Identifier' + snapshot: + $ref: '#/components/schemas/Snapshot' + branch: + type: string + Snapshot: + type: object + properties: + version: + type: integer + id: + type: integer + CommitTableResponse: + type: object + properties: + success: + type: boolean AlterDatabaseRequest: type: object properties: diff --git a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java index a9f3d02f5442..745a1e9ffd34 100644 --- a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java +++ b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java @@ -23,6 +23,7 @@ import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterPartitionsRequest; import org.apache.paimon.rest.requests.AlterTableRequest; +import org.apache.paimon.rest.requests.CommitTableRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; import org.apache.paimon.rest.requests.CreatePartitionsRequest; import org.apache.paimon.rest.requests.CreateTableRequest; @@ -30,6 +31,7 @@ import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; +import org.apache.paimon.rest.responses.CommitTableResponse; import org.apache.paimon.rest.responses.ConfigResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; @@ -360,6 +362,30 @@ public GetTableResponse renameTable( "comment")); } + @Operation( + summary = "Commit table", + tags = {"table"}) + @ApiResponses({ + @ApiResponse( + responseCode = "200", + content = {@Content(schema = @Schema(implementation = CommitTableResponse.class))}), + @ApiResponse( + responseCode = "404", + description = "Resource not found", + content = {@Content(schema = @Schema(implementation = ErrorResponse.class))}), + @ApiResponse( + responseCode = "500", + content = {@Content(schema = @Schema())}) + }) + @PostMapping("/v1/{prefix}/databases/{database}/tables/{table}/commit") + public CommitTableResponse commitTable( + @PathVariable String prefix, + @PathVariable String database, + @PathVariable String table, + @RequestBody CommitTableRequest request) { + return new CommitTableResponse(true); + } + @Operation( summary = "List partitions", tags = {"partition"}) From 100c330a586034b22b61c9a89eb5b01cec7bda0c Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 14 Jan 2025 23:47:51 +0800 Subject: [PATCH 2/2] fix --- .../org/apache/paimon/catalog/Identifier.java | 2 +- .../catalog/RenamingSnapshotCommit.java | 6 ++---- .../apache/paimon/catalog/SnapshotCommit.java | 4 +--- .../org/apache/paimon/rest/RESTCatalog.java | 7 ++----- .../rest/RESTSnapshotCommitFactory.java | 9 +++++---- .../rest/requests/CommitTableRequest.java | 19 +------------------ .../apache/paimon/rest/RESTCatalogServer.java | 6 +++++- paimon-open-api/rest-catalog-open-api.yaml | 2 -- 8 files changed, 17 insertions(+), 38 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java index ac6996821b03..7b932f5ee565 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java @@ -92,7 +92,7 @@ public Identifier( this.database = database; StringBuilder builder = new StringBuilder(table); - if (branch != null) { + if (branch != null && !"main".equalsIgnoreCase(branch)) { builder.append(Catalog.SYSTEM_TABLE_SPLITTER) .append(Catalog.SYSTEM_BRANCH_PREFIX) .append(branch); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java index 40411b9ac1ad..5575252c05af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java @@ -30,8 +30,6 @@ import java.util.Optional; import java.util.concurrent.Callable; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; - /** * A {@link SnapshotCommit} using file renaming to commit. * @@ -51,9 +49,9 @@ public RenamingSnapshotCommit(SnapshotManager snapshotManager, Lock lock) { } @Override - public boolean commit(Snapshot snapshot, @Nullable String branch) throws Exception { + public boolean commit(Snapshot snapshot, String branch) throws Exception { Path newSnapshotPath = - branch == null || branch.equals(DEFAULT_MAIN_BRANCH) + snapshotManager.branch().equals(branch) ? snapshotManager.snapshotPath(snapshot.id()) : snapshotManager.copyWithBranch(branch).snapshotPath(snapshot.id()); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java b/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java index a9b7864956ed..1f472464c850 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java @@ -21,14 +21,12 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.utils.SnapshotManager; -import javax.annotation.Nullable; - import java.io.Serializable; /** Interface to commit snapshot atomically. */ public interface SnapshotCommit extends AutoCloseable { - boolean commit(Snapshot snapshot, @Nullable String branch) throws Exception; + boolean commit(Snapshot snapshot, String branch) throws Exception; /** Factory to create {@link SnapshotCommit}. */ interface Factory extends Serializable { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 61a010aa07bc..b2bbcbb7673a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -71,8 +71,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -288,9 +286,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException { new RESTSnapshotCommitFactory(catalogLoader())); } - public boolean commitSnapshot( - Identifier identifier, Snapshot snapshot, @Nullable String branch) { - CommitTableRequest request = new CommitTableRequest(identifier, snapshot, branch); + public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) { + CommitTableRequest request = new CommitTableRequest(identifier, snapshot); CommitTableResponse response = client.post( resourcePaths.commitTable( diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java index 51e38b13e47f..1a027d41635f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java @@ -23,8 +23,6 @@ import org.apache.paimon.catalog.SnapshotCommit; import org.apache.paimon.utils.SnapshotManager; -import javax.annotation.Nullable; - /** Factory to create {@link SnapshotCommit} for REST Catalog. */ public class RESTSnapshotCommitFactory implements SnapshotCommit.Factory { @@ -41,8 +39,11 @@ public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotMana RESTCatalog catalog = loader.load(); return new SnapshotCommit() { @Override - public boolean commit(Snapshot snapshot, @Nullable String branch) { - return catalog.commitSnapshot(identifier, snapshot, branch); + public boolean commit(Snapshot snapshot, String branch) { + Identifier newIdentifier = + new Identifier( + identifier.getDatabaseName(), identifier.getTableName(), branch); + return catalog.commitSnapshot(newIdentifier, snapshot); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java index 2dca5672bc8a..de8474c1d3eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java @@ -25,18 +25,14 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import javax.annotation.Nullable; - /** Request for committing snapshot to table. */ @JsonIgnoreProperties(ignoreUnknown = true) public class CommitTableRequest implements RESTRequest { private static final String FIELD_IDENTIFIER = "identifier"; private static final String FIELD_SNAPSHOT = "snapshot"; - private static final String FIELD_BRANCH = "branch"; @JsonProperty(FIELD_IDENTIFIER) private final Identifier identifier; @@ -44,19 +40,12 @@ public class CommitTableRequest implements RESTRequest { @JsonProperty(FIELD_SNAPSHOT) private final Snapshot snapshot; - @JsonProperty(FIELD_BRANCH) - @JsonInclude(JsonInclude.Include.NON_NULL) - @Nullable - private final String branch; - @JsonCreator public CommitTableRequest( @JsonProperty(FIELD_IDENTIFIER) Identifier identifier, - @JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot, - @JsonProperty(FIELD_BRANCH) @Nullable String branch) { + @JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot) { this.identifier = identifier; this.snapshot = snapshot; - this.branch = branch; } @JsonGetter(FIELD_IDENTIFIER) @@ -68,10 +57,4 @@ public Identifier getIdentifier() { public Snapshot getSnapshot() { return snapshot; } - - @JsonGetter(FIELD_BRANCH) - @Nullable - public String getBranch() { - return branch; - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 6ee90f0f7e5b..21284629ccc9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -289,7 +289,11 @@ private static MockResponse commitTableApiHandler( (FileStoreTable) catalog.getTable(Identifier.create(databaseName, tableName)); RenamingSnapshotCommit commit = new RenamingSnapshotCommit(table.snapshotManager(), Lock.emptyFactory().create()); - boolean success = commit.commit(requestBody.getSnapshot(), requestBody.getBranch()); + String branchName = requestBody.getIdentifier().getBranchName(); + if (branchName == null) { + branchName = "main"; + } + boolean success = commit.commit(requestBody.getSnapshot(), branchName); CommitTableResponse response = new CommitTableResponse(success); return mockResponse(response, 200); } diff --git a/paimon-open-api/rest-catalog-open-api.yaml b/paimon-open-api/rest-catalog-open-api.yaml index a42313266ebd..e60bb6617767 100644 --- a/paimon-open-api/rest-catalog-open-api.yaml +++ b/paimon-open-api/rest-catalog-open-api.yaml @@ -955,8 +955,6 @@ components: $ref: '#/components/schemas/Identifier' snapshot: $ref: '#/components/schemas/Snapshot' - branch: - type: string Snapshot: type: object properties: