Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.ManifestsReader;
Expand All @@ -45,6 +44,7 @@
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.PartitionHandler;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
Expand Down Expand Up @@ -345,11 +345,9 @@ public PartitionExpire newPartitionExpire(String commitUser) {
return null;
}

MetastoreClient.Factory metastoreClientFactory =
catalogEnvironment.metastoreClientFactory();
MetastoreClient metastoreClient = null;
if (options.partitionedTableInMetastore() && metastoreClientFactory != null) {
metastoreClient = metastoreClientFactory.create();
PartitionHandler partitionHandler = null;
if (options.partitionedTableInMetastore()) {
partitionHandler = catalogEnvironment.partitionHandler();
}

return new PartitionExpire(
Expand All @@ -358,7 +356,7 @@ public PartitionExpire newPartitionExpire(String commitUser) {
PartitionExpireStrategy.createPartitionExpireStrategy(options, partitionType()),
newScan(),
newCommit(commitUser),
metastoreClient,
partitionHandler,
options.endInputCheckPartitionExpire(),
options.partitionExpireMaxNum());
}
Expand All @@ -377,11 +375,12 @@ public TagAutoManager newTagCreationManager() {
public List<TagCallback> createTagCallbacks() {
List<TagCallback> callbacks = new ArrayList<>(CallbackUtils.loadTagCallbacks(options));
String partitionField = options.tagToPartitionField();
MetastoreClient.Factory metastoreClientFactory =
catalogEnvironment.metastoreClientFactory();
if (partitionField != null && metastoreClientFactory != null) {
callbacks.add(
new AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));

if (partitionField != null) {
PartitionHandler partitionHandler = catalogEnvironment.partitionHandler();
if (partitionHandler != null) {
callbacks.add(new AddPartitionTagCallback(partitionHandler, partitionField));
}
}
if (options.tagCreateSuccessFile()) {
callbacks.add(new SuccessFileTagCallback(fileIO, newTagManager().tagDirectory()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -410,7 +409,7 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
metastoreClientFactory(identifier).orElse(null)));
catalogLoader()));
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
Expand Down Expand Up @@ -477,11 +476,6 @@ protected TableMeta getDataTableMeta(Identifier identifier) throws TableNotExist
protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;

/** Get metastore client factory for the table specified by {@code identifier}. */
public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
return Optional.empty();
}

public Path getTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -53,60 +54,45 @@
/** A {@link Catalog} to cache databases and tables and manifests. */
public class CachingCatalog extends DelegateCatalog {

private final Options options;

private final Duration expirationInterval;
private final int snapshotMaxNumPerTable;
private final long cachedPartitionMaxNum;

protected final Cache<String, Database> databaseCache;
protected final Cache<Identifier, Table> tableCache;
protected Cache<String, Database> databaseCache;
protected Cache<Identifier, Table> tableCache;
@Nullable protected final SegmentsCache<Path> manifestCache;

// partition cache will affect data latency
@Nullable protected final Cache<Identifier, List<Partition>> partitionCache;

public CachingCatalog(Catalog wrapped) {
this(
wrapped,
CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
CACHE_PARTITION_MAX_NUM.defaultValue(),
CACHE_SNAPSHOT_MAX_NUM_PER_TABLE.defaultValue());
}

public CachingCatalog(
Catalog wrapped,
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
long cachedPartitionMaxNum,
int snapshotMaxNumPerTable) {
this(
wrapped,
expirationInterval,
manifestMaxMemory,
manifestCacheThreshold,
cachedPartitionMaxNum,
snapshotMaxNumPerTable,
Ticker.systemTicker());
}
@Nullable protected Cache<Identifier, List<Partition>> partitionCache;

public CachingCatalog(
Catalog wrapped,
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
long cachedPartitionMaxNum,
int snapshotMaxNumPerTable,
Ticker ticker) {
public CachingCatalog(Catalog wrapped, Options options) {
super(wrapped);
this.options = options;
MemorySize manifestMaxMemory = options.get(CACHE_MANIFEST_SMALL_FILE_MEMORY);
long manifestCacheThreshold = options.get(CACHE_MANIFEST_SMALL_FILE_THRESHOLD).getBytes();
Optional<MemorySize> maxMemory = options.getOptional(CACHE_MANIFEST_MAX_MEMORY);
if (maxMemory.isPresent() && maxMemory.get().compareTo(manifestMaxMemory) > 0) {
// cache all manifest files
manifestMaxMemory = maxMemory.get();
manifestCacheThreshold = Long.MAX_VALUE;
}

this.expirationInterval = options.get(CACHE_EXPIRATION_INTERVAL_MS);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
throw new IllegalArgumentException(
"When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled.");
}
this.snapshotMaxNumPerTable = options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE);
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);

this.expirationInterval = expirationInterval;
this.snapshotMaxNumPerTable = snapshotMaxNumPerTable;
this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM);
init(Ticker.systemTicker());
}

@VisibleForTesting
void init(Ticker ticker) {
this.databaseCache =
Caffeine.newBuilder()
.softValues()
Expand All @@ -121,7 +107,6 @@ public CachingCatalog(
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);
this.partitionCache =
cachedPartitionMaxNum == 0
? null
Expand All @@ -142,21 +127,12 @@ public static Catalog tryToCreate(Catalog catalog, Options options) {
return catalog;
}

MemorySize manifestMaxMemory = options.get(CACHE_MANIFEST_SMALL_FILE_MEMORY);
long manifestThreshold = options.get(CACHE_MANIFEST_SMALL_FILE_THRESHOLD).getBytes();
Optional<MemorySize> maxMemory = options.getOptional(CACHE_MANIFEST_MAX_MEMORY);
if (maxMemory.isPresent() && maxMemory.get().compareTo(manifestMaxMemory) > 0) {
// cache all manifest files
manifestMaxMemory = maxMemory.get();
manifestThreshold = Long.MAX_VALUE;
}
return new CachingCatalog(
catalog,
options.get(CACHE_EXPIRATION_INTERVAL_MS),
manifestMaxMemory,
manifestThreshold,
options.get(CACHE_PARTITION_MAX_NUM),
options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE));
return new CachingCatalog(catalog, options);
}

@Override
public CatalogLoader catalogLoader() {
return new CachingCatalogLoader(wrapped.catalogLoader(), options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.options.Options;

/** Loader to create {@link CachingCatalog}. */
public class CachingCatalogLoader implements CatalogLoader {

private static final long serialVersionUID = 1L;

private final CatalogLoader catalogLoader;
private final Options options;

public CachingCatalogLoader(CatalogLoader catalogLoader, Options options) {
this.catalogLoader = catalogLoader;
this.options = options;
}

@Override
public Catalog load() {
return CachingCatalog.tryToCreate(catalogLoader.load(), options);
}
}
75 changes: 42 additions & 33 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,6 @@
@Public
public interface Catalog extends AutoCloseable {

// constants for system table and database
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String SYSTEM_BRANCH_PREFIX = "branch_";

// constants for table and database
String COMMENT_PROP = "comment";
String OWNER_PROP = "owner";

// constants for database
String DEFAULT_DATABASE = "default";
String DB_SUFFIX = ".db";
String DB_LOCATION_PROP = "location";

// constants for table
String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
String NUM_ROWS_PROP = "numRows";
String NUM_FILES_PROP = "numFiles";
String TOTAL_SIZE_PROP = "totalSize";
String LAST_UPDATE_TIME_PROP = "lastUpdateTime";

/** Warehouse root path for creating new databases. */
String warehouse();

/** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */
FileIO fileIO();

/** Catalog options for re-creating this catalog. */
Map<String, String> options();

/** Return a boolean that indicates whether this catalog is case-sensitive. */
boolean caseSensitive();

// ======================= database methods ===============================

/**
Expand Down Expand Up @@ -399,6 +366,48 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
throw new UnsupportedOperationException();
}

// ==================== Catalog Information ==========================

/** Warehouse root path for creating new databases. */
String warehouse();

/** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */
FileIO fileIO();

/** Catalog options for re-creating this catalog. */
Map<String, String> options();

/** Serializable loader to create catalog. */
CatalogLoader catalogLoader();

/** Return a boolean that indicates whether this catalog is case-sensitive. */
boolean caseSensitive();

// ======================= Constants ===============================

// constants for system table and database
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String SYSTEM_BRANCH_PREFIX = "branch_";

// constants for table and database
String COMMENT_PROP = "comment";
String OWNER_PROP = "owner";

// constants for database
String DEFAULT_DATABASE = "default";
String DB_SUFFIX = ".db";
String DB_LOCATION_PROP = "location";

// constants for table
String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
String NUM_ROWS_PROP = "numRows";
String NUM_FILES_PROP = "numFiles";
String TOTAL_SIZE_PROP = "totalSize";
String LAST_UPDATE_TIME_PROP = "lastUpdateTime";

// ======================= Exceptions ===============================

/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@

package org.apache.paimon.catalog;

import org.apache.paimon.annotation.Public;

import java.io.Serializable;

/** Loader for creating a {@link Catalog}. */
/**
* Loader for creating a {@link Catalog}.
*
* @since 1.1.0
*/
@Public
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @since

@FunctionalInterface
public interface CatalogLoader extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Map;

/** A {@link Catalog} to delegate all operations to another {@link Catalog}. */
public class DelegateCatalog implements Catalog {
public abstract class DelegateCatalog implements Catalog {

protected final Catalog wrapped;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public String warehouse() {
return warehouse.toString();
}

@Override
public CatalogLoader catalogLoader() {
return new FileSystemCatalogLoader(fileIO, warehouse, catalogOptions);
}

@Override
public boolean caseSensitive() {
return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true);
Expand Down
Loading
Loading