Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand All @@ -39,6 +40,8 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -62,7 +65,10 @@
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -96,16 +102,31 @@ public FileIO fileIO() {
return fileIO;
}

public Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
return Optional.empty();
}

String lock = catalogOptions.get(LOCK_TYPE);
if (lock == null) {
return defaultLockFactory();
}

return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock));
}

public Optional<CatalogLockFactory> defaultLockFactory() {
return Optional.empty();
}

public Optional<CatalogLockContext> lockContext() {
return CatalogUtils.lockContext(catalogOptions);
return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
}

protected boolean lockEnabled() {
return CatalogUtils.lockEnabled(catalogOptions, fileIO);
return catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
}

protected boolean allowCustomTablePath() {
Expand Down Expand Up @@ -369,13 +390,14 @@ protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange>
public Table getTable(Identifier identifier) throws TableNotExistException {
if (isSystemDatabase(identifier.getDatabaseName())) {
String tableName = identifier.getTableName();
Table table =
SystemTableLoader.loadGlobal(
tableName, fileIO, this::allTablePaths, catalogOptions);
if (table == null) {
throw new TableNotExistException(identifier);
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
return new AllTableOptionsTable(fileIO, allTablePaths());
case CATALOG_OPTIONS:
return new CatalogOptionsTable(catalogOptions);
default:
throw new TableNotExistException(identifier);
}
return table;
} else if (identifier.isSystemTable()) {
Table originTable =
getDataOrFormatTable(
Expand All @@ -384,7 +406,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
identifier.getTableName(),
identifier.getBranchName(),
null));
return CatalogUtils.getSystemTable(identifier, originTable);
return CatalogUtils.createSystemTable(identifier, originTable);
} else {
return getDataOrFormatTable(identifier);
}
Expand All @@ -402,8 +424,7 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist
identifier,
tableMeta.uuid,
Lock.factory(
lockFactory(catalogOptions, fileIO(), defaultLockFactory())
.orElse(null),
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
metastoreClientFactory(identifier).orElse(null)));
Expand Down Expand Up @@ -447,7 +468,7 @@ public void createFormatTable(Identifier identifier, Schema schema) {
* @return The warehouse path for the database
*/
public Path newDatabasePath(String database) {
return CatalogUtils.newDatabasePath(warehouse(), database);
return newDatabasePath(warehouse(), database);
}

public Map<String, Map<String, Path>> allTablePaths() {
Expand Down Expand Up @@ -490,6 +511,18 @@ protected void assertMainBranch(Identifier identifier) {
}
}

public static Path newTableLocation(String warehouse, Identifier identifier) {
checkNotBranch(identifier, "newTableLocation");
checkNotSystemTable(identifier, "newTableLocation");
return new Path(
newDatabasePath(warehouse, identifier.getDatabaseName()),
identifier.getTableName());
}

public static Path newDatabasePath(String warehouse, String database) {
return new Path(warehouse, database + DB_SUFFIX);
}

private void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,17 @@

package org.apache.paimon.catalog;

import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;

import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;

/** Utils for {@link Catalog}. */
Expand Down Expand Up @@ -97,18 +90,6 @@ public static void checkNotSystemTable(Identifier identifier, String method) {
}
}

public static Path newDatabasePath(String warehouse, String database) {
return new Path(warehouse, database + DB_SUFFIX);
}

public static Path newTableLocation(String warehouse, Identifier identifier) {
checkNotBranch(identifier, "newTableLocation");
checkNotSystemTable(identifier, "newTableLocation");
return new Path(
newDatabasePath(warehouse, identifier.getDatabaseName()),
identifier.getTableName());
}

public static void checkNotBranch(Identifier identifier, String method) {
if (identifier.getBranchName() != null) {
throw new IllegalArgumentException(
Expand All @@ -119,32 +100,7 @@ public static void checkNotBranch(Identifier identifier, String method) {
}
}

public static Optional<CatalogLockFactory> lockFactory(
Options options, FileIO fileIO, Optional<CatalogLockFactory> defaultLockFactoryOpt) {
boolean lockEnabled = lockEnabled(options, fileIO);
if (!lockEnabled) {
return Optional.empty();
}

String lock = options.get(LOCK_TYPE);
if (lock == null) {
return defaultLockFactoryOpt;
}

return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock));
}

public static Optional<CatalogLockContext> lockContext(Options options) {
return Optional.of(CatalogLockContext.fromOptions(options));
}

public static boolean lockEnabled(Options options, FileIO fileIO) {
return options.getOptional(LOCK_ENABLED).orElse(fileIO != null && fileIO.isObjectStore());
}

public static Table getSystemTable(Identifier identifier, Table originTable)
public static Table createSystemTable(Identifier identifier, Table originTable)
throws Catalog.TableNotExistException {
if (!(originTable instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Map;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;

/** A catalog implementation for {@link FileIO}. */
Expand Down Expand Up @@ -124,9 +123,7 @@ public void createTableImpl(Identifier identifier, Schema schema) {
private SchemaManager schemaManager(Identifier identifier) {
Path path = getTableLocation(identifier);
CatalogLock catalogLock =
lockFactory(catalogOptions, fileIO(), defaultLockFactory())
.map(fac -> fac.createLock(assertGetLockContext()))
.orElse(null);
lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null);
return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault())
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}
Expand Down
Loading
Loading