Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> cal
SnapshotManager snapshotManager = snapshotManager();
SnapshotCommit snapshotCommit = catalogEnvironment.snapshotCommit(snapshotManager);
if (snapshotCommit == null) {
snapshotCommit =
new RenamingSnapshotCommit(snapshotManager, Lock.emptyFactory().create());
snapshotCommit = new RenamingSnapshotCommit(snapshotManager, Lock.empty());
}
return new FileStoreCommitImpl(
snapshotCommit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public <T> T runWithLock(Identifier identifier, Callable<T> callable) throws Exc
lockFactory
.map(factory -> factory.createLock(lockContext().orElse(null)))
.map(l -> Lock.fromCatalog(l, identifier))
.orElseGet(() -> Lock.emptyFactory().create())) {
.orElseGet(Lock::empty)) {
return lock.runWithLock(callable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotMana
Optional.ofNullable(lockFactory)
.map(factory -> factory.createLock(lockContext))
.map(l -> Lock.fromCatalog(l, identifier))
.orElseGet(() -> Lock.emptyFactory().create());
.orElseGet(Lock::empty);
return new RenamingSnapshotCommit(snapshotManager, lock);
}

Expand Down
58 changes: 2 additions & 56 deletions paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,8 @@

import org.apache.paimon.annotation.Public;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.concurrent.Callable;

/**
Expand All @@ -40,57 +35,8 @@ public interface Lock extends AutoCloseable {
/** Run with lock. */
<T> T runWithLock(Callable<T> callable) throws Exception;

/** A factory to create {@link Lock}. */
interface Factory extends Serializable {
Lock create();
}

static Factory factory(
@Nullable CatalogLockFactory lockFactory,
@Nullable CatalogLockContext lockContext,
Identifier tablePath) {
return lockFactory == null
? new EmptyFactory()
: new LockFactory(lockFactory, lockContext, tablePath);
}

static Factory emptyFactory() {
return new EmptyFactory();
}

/** A {@link Factory} creating lock from catalog. */
class LockFactory implements Factory {

private static final long serialVersionUID = 1L;

private final CatalogLockFactory lockFactory;
private final CatalogLockContext lockContext;
private final Identifier tablePath;

public LockFactory(
CatalogLockFactory lockFactory,
CatalogLockContext lockContext,
Identifier tablePath) {
this.lockFactory = lockFactory;
this.lockContext = lockContext;
this.tablePath = tablePath;
}

@Override
public Lock create() {
return fromCatalog(lockFactory.createLock(lockContext), tablePath);
}
}

/** A {@link Factory} creating empty lock. */
class EmptyFactory implements Factory {

private static final long serialVersionUID = 1L;

@Override
public Lock create() {
return new EmptyLock();
}
static Lock empty() {
return new EmptyLock();
}

/** An empty lock. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private static MockResponse commitTableApiHandler(
FileStoreTable table =
(FileStoreTable) catalog.getTable(Identifier.create(databaseName, tableName));
RenamingSnapshotCommit commit =
new RenamingSnapshotCommit(table.snapshotManager(), Lock.emptyFactory().create());
new RenamingSnapshotCommit(table.snapshotManager(), Lock.empty());
String branchName = requestBody.getIdentifier().getBranchName();
if (branchName == null) {
branchName = "main";
Expand Down
Loading