Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon;

import org.apache.paimon.CoreOptions.ExternalPathStrategy;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.FileIO;
Expand All @@ -31,6 +33,7 @@
import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
Expand Down Expand Up @@ -261,7 +264,14 @@ public FileStoreCommitImpl newCommit(String commitUser) {

@Override
public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> callbacks) {
SnapshotManager snapshotManager = snapshotManager();
SnapshotCommit snapshotCommit = catalogEnvironment.snapshotCommit(snapshotManager);
if (snapshotCommit == null) {
snapshotCommit =
new RenamingSnapshotCommit(snapshotManager, Lock.emptyFactory().create());
}
return new FileStoreCommitImpl(
snapshotCommit,
fileIO,
schemaManager,
tableName,
Expand All @@ -270,7 +280,7 @@ public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> cal
options,
options.partitionDefaultName(),
pathFactory(),
snapshotManager(),
snapshotManager,
manifestFileFactory(),
manifestListFactory(),
indexManifestFileFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -367,9 +366,10 @@ protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange>

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
Lock.Factory lockFactory =
Lock.factory(lockFactory().orElse(null), lockContext().orElse(null), identifier);
return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, lockFactory);
SnapshotCommit.Factory commitFactory =
new RenamingSnapshotCommit.Factory(
lockFactory().orElse(null), lockContext().orElse(null));
return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, commitFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -173,7 +172,7 @@ public static Table loadTable(
Catalog catalog,
Identifier identifier,
TableMetadata.Loader metadataLoader,
Lock.Factory lockFactory)
SnapshotCommit.Factory commitFactory)
throws Catalog.TableNotExistException {
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
Expand All @@ -188,7 +187,7 @@ public static Table loadTable(

CatalogEnvironment catalogEnv =
new CatalogEnvironment(
identifier, metadata.uuid(), lockFactory, catalog.catalogLoader());
identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory);
Path path = new Path(schema.options().get(PATH.key()));
FileStoreTable table =
FileStoreTableFactory.create(catalog.fileIO(), path, schema, catalogEnv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
Expand Down Expand Up @@ -117,20 +118,30 @@ protected void dropTableImpl(Identifier identifier) {

@Override
public void createTableImpl(Identifier identifier, Schema schema) {
uncheck(() -> schemaManager(identifier).createTable(schema));
SchemaManager schemaManager = schemaManager(identifier);
try {
runWithLock(identifier, () -> uncheck(() -> schemaManager.createTable(schema)));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private SchemaManager schemaManager(Identifier identifier) {
Path path = getTableLocation(identifier);
CatalogLock catalogLock =
lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null);
return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault())
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
public <T> T runWithLock(Identifier identifier, Callable<T> callable) throws Exception {
Optional<CatalogLockFactory> lockFactory = lockFactory();
try (Lock lock =
lockFactory
.map(factory -> factory.createLock(lockContext().orElse(null)))
.map(l -> Lock.fromCatalog(l, identifier))
.orElseGet(() -> Lock.emptyFactory().create())) {
return lock.runWithLock(callable);
}
}

private CatalogLockContext assertGetLockContext() {
return lockContext()
.orElseThrow(() -> new RuntimeException("No lock context when lock is enabled."));
private SchemaManager schemaManager(Identifier identifier) {
Path path = getTableLocation(identifier);
return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault());
}

@Override
Expand All @@ -143,7 +154,17 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
schemaManager(identifier).commitChanges(changes);
SchemaManager schemaManager = schemaManager(identifier);
try {
runWithLock(identifier, () -> schemaManager.commitChanges(changes));
} catch (TableNotExistException
| ColumnAlreadyExistException
| ColumnNotExistException
| RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

protected static <T> T uncheck(Callable<T> callable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public Identifier(
this.database = database;

StringBuilder builder = new StringBuilder(table);
if (branch != null) {
if (branch != null && !"main".equalsIgnoreCase(branch)) {
builder.append(Catalog.SYSTEM_TABLE_SPLITTER)
.append(Catalog.SYSTEM_BRANCH_PREFIX)
.append(branch);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.util.Optional;
import java.util.concurrent.Callable;

/**
* A {@link SnapshotCommit} using file renaming to commit.
*
* <p>Note that when the file system is local or HDFS, rename is atomic. But if the file system is
* object storage, we need additional lock protection.
*/
public class RenamingSnapshotCommit implements SnapshotCommit {

private final SnapshotManager snapshotManager;
private final FileIO fileIO;
private final Lock lock;

public RenamingSnapshotCommit(SnapshotManager snapshotManager, Lock lock) {
this.snapshotManager = snapshotManager;
this.fileIO = snapshotManager.fileIO();
this.lock = lock;
}

@Override
public boolean commit(Snapshot snapshot, String branch) throws Exception {
Path newSnapshotPath =
snapshotManager.branch().equals(branch)
? snapshotManager.snapshotPath(snapshot.id())
: snapshotManager.copyWithBranch(branch).snapshotPath(snapshot.id());

Callable<Boolean> callable =
() -> {
boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(snapshot.id());
}
return committed;
};
return lock.runWithLock(
() ->
// fs.rename may not returns false if target file
// already exists, or even not atomic
// as we're relying on external locking, we can first
// check if file exist then rename to work around this
// case
!fileIO.exists(newSnapshotPath) && callable.call());
}

@Override
public void close() throws Exception {
this.lock.close();
}

/** Factory to create {@link RenamingSnapshotCommit}. */
public static class Factory implements SnapshotCommit.Factory {

private static final long serialVersionUID = 1L;

@Nullable private final CatalogLockFactory lockFactory;
@Nullable private final CatalogLockContext lockContext;

public Factory(
@Nullable CatalogLockFactory lockFactory,
@Nullable CatalogLockContext lockContext) {
this.lockFactory = lockFactory;
this.lockContext = lockContext;
}

@Override
public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager) {
Lock lock =
Optional.ofNullable(lockFactory)
.map(factory -> factory.createLock(lockContext))
.map(l -> Lock.fromCatalog(l, identifier))
.orElseGet(() -> Lock.emptyFactory().create());
return new RenamingSnapshotCommit(snapshotManager, lock);
}

@VisibleForTesting
@Nullable
public CatalogLockFactory lockFactory() {
return lockFactory;
}

@VisibleForTesting
@Nullable
public CatalogLockContext lockContext() {
return lockContext;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.Snapshot;
import org.apache.paimon.utils.SnapshotManager;

import java.io.Serializable;

/** Interface to commit snapshot atomically. */
public interface SnapshotCommit extends AutoCloseable {

boolean commit(Snapshot snapshot, String branch) throws Exception;

/** Factory to create {@link SnapshotCommit}. */
interface Factory extends Serializable {
SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager);
}
}
25 changes: 18 additions & 7 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout;
Expand Down Expand Up @@ -289,7 +290,8 @@ protected void dropTableImpl(Identifier identifier) {
protected void createTableImpl(Identifier identifier, Schema schema) {
try {
// create table file
getSchemaManager(identifier).createTable(schema);
SchemaManager schemaManager = getSchemaManager(identifier);
runWithLock(identifier, () -> schemaManager.createTable(schema));
// Update schema metadata
Path path = getTableLocation(identifier);
int insertRecord =
Expand Down Expand Up @@ -350,10 +352,19 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {

@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
throws ColumnAlreadyExistException, TableNotExistException, ColumnNotExistException {
assertMainBranch(identifier);
SchemaManager schemaManager = getSchemaManager(identifier);
schemaManager.commitChanges(changes);
try {
runWithLock(identifier, () -> schemaManager.commitChanges(changes));
} catch (TableNotExistException
| ColumnAlreadyExistException
| ColumnNotExistException
| RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Failed to alter table " + identifier.getFullName(), e);
}
}

@Override
Expand Down Expand Up @@ -384,17 +395,17 @@ public Optional<CatalogLockContext> lockContext() {
return Optional.of(new JdbcCatalogLockContext(catalogKey, options));
}

private Lock lock(Identifier identifier) {
public <T> T runWithLock(Identifier identifier, Callable<T> callable) throws Exception {
if (!lockEnabled()) {
return new Lock.EmptyLock();
return callable.call();
}
JdbcCatalogLock lock =
new JdbcCatalogLock(
connections,
catalogKey,
checkMaxSleep(options.toMap()),
acquireTimeout(options.toMap()));
return Lock.fromCatalog(lock, identifier);
return Lock.fromCatalog(lock, identifier).runWithLock(callable);
}

@Override
Expand All @@ -403,7 +414,7 @@ public void close() throws Exception {
}

private SchemaManager getSchemaManager(Identifier identifier) {
return new SchemaManager(fileIO, getTableLocation(identifier)).withLock(lock(identifier));
return new SchemaManager(fileIO, getTableLocation(identifier));
}

private Map<String, String> fetchProperties(String databaseName) {
Expand Down
Loading
Loading