Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ public StatsFileHandler newStatsFileHandler() {
}

protected ManifestsReader newManifestsReader(boolean forWrite) {
return new ManifestsReader(partitionType, snapshotManager(), manifestListFactory(forWrite));
return new ManifestsReader(
partitionType,
options.partitionDefaultName(),
snapshotManager(),
manifestListFactory(forWrite));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -160,32 +158,11 @@ public Database getDatabase(String name) throws DatabaseNotExistException {
protected abstract Database getDatabaseImpl(String name) throws DatabaseNotExistException;

@Override
public void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
Identifier tableIdentifier =
Identifier.create(identifier.getDatabaseName(), identifier.getTableName());
FileStoreTable table = (FileStoreTable) getTable(tableIdentifier);

if (table.partitionKeys().isEmpty() || !table.coreOptions().partitionedTableInMetastore()) {
return;
}

MetastoreClient.Factory metastoreFactory =
table.catalogEnvironment().metastoreClientFactory();
if (metastoreFactory == null) {
throw new UnsupportedOperationException(
"The catalog must have metastore to create partition.");
}

try (MetastoreClient client = metastoreFactory.create()) {
client.addPartition(new LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
checkNotSystemTable(identifier, "dropPartition");
Table table = getTable(identifier);
Expand All @@ -195,11 +172,18 @@ public void dropPartition(Identifier identifier, Map<String, String> partitionSp
.store()
.newCommit(
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
commit.dropPartitions(
Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER);
commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER);
}
}

@Override
public void alterPartitions(Identifier identifier, List<Partition> partitions)
throws TableNotExistException {}

@Override
public void markDonePartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {}

@Override
public List<Partition> listPartitions(Identifier identifier) throws TableNotExistException {
return listPartitionsFromFileSystem(getTable(identifier));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,18 @@ public List<Partition> listPartitions(Identifier identifier) throws TableNotExis
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException, PartitionNotExistException {
wrapped.dropPartition(identifier, partitions);
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
wrapped.dropPartitions(identifier, partitions);
if (partitionCache != null) {
partitionCache.invalidate(identifier);
}
}

@Override
public void alterPartitions(Identifier identifier, List<Partition> partitions)
throws TableNotExistException {
wrapped.alterPartitions(identifier, partitions);
if (partitionCache != null) {
partitionCache.invalidate(identifier);
}
Expand Down
111 changes: 58 additions & 53 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public interface Catalog extends AutoCloseable {
/** Return a boolean that indicates whether this catalog is case-sensitive. */
boolean caseSensitive();

// ======================= database methods ===============================

/**
* Get the names of all databases in this catalog.
*
Expand Down Expand Up @@ -139,6 +141,8 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
void alterDatabase(String name, List<PropertyChange> changes, boolean ignoreIfNotExists)
throws DatabaseNotExistException;

// ======================= table methods ===============================

/**
* Return a {@link Table} identified by the given {@link Identifier}.
*
Expand Down Expand Up @@ -231,52 +235,81 @@ void alterTable(Identifier identifier, List<SchemaChange> changes, boolean ignor
default void invalidateTable(Identifier identifier) {}

/**
* Create the partition of the specify table.
* Modify an existing table from a {@link SchemaChange}.
*
* <p>NOTE: System tables can not be altered.
*
* @param identifier path of the table to be modified
* @param change the schema change
* @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to
* false, throw an exception, if set to true, do nothing.
* @throws TableNotExistException if the table does not exist
*/
default void alterTable(Identifier identifier, SchemaChange change, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists);
}

// ======================= partition methods ===============================

/**
* Create partitions of the specify table.
*
* <p>Only catalog with metastore can support this method, and only table with
* 'metastore.partitioned-table' can support this method.
*
* @param identifier path of the table to drop partition
* @param partitionSpec the partition to be created
* @param identifier path of the table to create partitions
* @param partitions partitions to be created
* @throws TableNotExistException if the table does not exist
*/
void createPartition(Identifier identifier, Map<String, String> partitionSpec)
void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException;

/**
* Drop the partition of the specify table.
* Drop partitions of the specify table.
*
* @param identifier path of the table to drop partition
* @param partition the partition to be deleted
* @param identifier path of the table to drop partitions
* @param partitions partitions to be deleted
* @throws TableNotExistException if the table does not exist
* @throws PartitionNotExistException if the partition does not exist
*/
void dropPartition(Identifier identifier, Map<String, String> partition)
throws TableNotExistException, PartitionNotExistException;
void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException;

/**
* Get Partition of all partitions of the table.
* Alter partitions of the specify table.
*
* @param identifier path of the table to list partitions
* <p>Only catalog with metastore can support this method, and only table with
* 'metastore.partitioned-table' can support this method.
*
* @param identifier path of the table to alter partitions
* @param partitions partitions to be altered
* @throws TableNotExistException if the table does not exist
*/
List<Partition> listPartitions(Identifier identifier) throws TableNotExistException;
void alterPartitions(Identifier identifier, List<Partition> partitions)
throws TableNotExistException;

/**
* Modify an existing table from a {@link SchemaChange}.
* Mark partitions done of the specify table.
*
* <p>NOTE: System tables can not be altered.
* <p>Only catalog with metastore can support this method, and only table with
* 'metastore.partitioned-table' can support this method.
*
* @param identifier path of the table to be modified
* @param change the schema change
* @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to
* false, throw an exception, if set to true, do nothing.
* @param identifier path of the table to mark done partitions
* @param partitions partitions to be marked done
* @throws TableNotExistException if the table does not exist
*/
default void alterTable(Identifier identifier, SchemaChange change, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists);
}
void markDonePartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException;

/**
* Get Partition of all partitions of the table.
*
* @param identifier path of the table to list partitions
* @throws TableNotExistException if the table does not exist
*/
List<Partition> listPartitions(Identifier identifier) throws TableNotExistException;

// ======================= view methods ===============================

/**
* Return a {@link View} identified by the given {@link Identifier}.
Expand Down Expand Up @@ -340,6 +373,8 @@ default void renameView(Identifier fromView, Identifier toView, boolean ignoreIf
throw new UnsupportedOperationException();
}

// ======================= repair methods ===============================

/**
* Repair the entire Catalog, repair the metadata in the metastore consistent with the metadata
* in the filesystem, register missing tables in the metastore.
Expand Down Expand Up @@ -508,36 +543,6 @@ public Identifier identifier() {
}
}

/** Exception for trying to operate on a partition that doesn't exist. */
class PartitionNotExistException extends Exception {

private static final String MSG = "Partition %s do not exist in the table %s.";

private final Identifier identifier;

private final Map<String, String> partitionSpec;

public PartitionNotExistException(
Identifier identifier, Map<String, String> partitionSpec) {
this(identifier, partitionSpec, null);
}

public PartitionNotExistException(
Identifier identifier, Map<String, String> partitionSpec, Throwable cause) {
super(String.format(MSG, partitionSpec, identifier.getFullName()), cause);
this.identifier = identifier;
this.partitionSpec = partitionSpec;
}

public Identifier identifier() {
return identifier;
}

public Map<String, String> partitionSpec() {
return partitionSpec;
}
}

/** Exception for trying to alter a column that already exists. */
class ColumnAlreadyExistException extends Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static List<Partition> listPartitionsFromFileSystem(Table table) {
InternalRowPartitionComputer computer =
new InternalRowPartitionComputer(
options.get(PARTITION_DEFAULT_NAME),
table.rowType(),
table.rowType().project(table.partitionKeys()),
table.partitionKeys().toArray(new String[0]),
options.get(PARTITION_GENERATE_LEGCY_NAME));
List<PartitionEntry> partitionEntries =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,30 @@ public void alterTable(
wrapped.alterTable(identifier, changes, ignoreIfNotExists);
}

@Override
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
wrapped.createPartitions(identifier, partitions);
}

@Override
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
wrapped.dropPartitions(identifier, partitions);
}

@Override
public void alterPartitions(Identifier identifier, List<Partition> partitions)
throws TableNotExistException {
wrapped.alterPartitions(identifier, partitions);
}

@Override
public void markDonePartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
wrapped.markDonePartitions(identifier, partitions);
}

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
return wrapped.getTable(identifier);
Expand Down Expand Up @@ -152,18 +176,6 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN
wrapped.renameView(fromView, toView, ignoreIfNotExists);
}

@Override
public void createPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException {
wrapped.createPartition(identifier, partitions);
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException, PartitionNotExistException {
wrapped.dropPartition(identifier, partitions);
}

@Override
public List<Partition> listPartitions(Identifier identifier) throws TableNotExistException {
return wrapped.listPartitions(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
return this;
}

@Override
public FileStoreScan withPartitionsFilter(List<Map<String, String>> partitions) {
manifestsReader.withPartitionsFilter(partitions);
return this;
}

@Override
public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
manifestsReader.withPartitionFilter(predicate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public interface FileStoreScan {

FileStoreScan withPartitionFilter(List<BinaryRow> partitions);

FileStoreScan withPartitionsFilter(List<Map<String, String>> partitions);

FileStoreScan withPartitionFilter(PartitionPredicate predicate);

FileStoreScan withBucket(int bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,29 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;

/** A util class to read manifest files. */
@ThreadSafe
public class ManifestsReader {

private final RowType partitionType;
private final String partitionDefaultValue;
private final SnapshotManager snapshotManager;
private final ManifestList.Factory manifestListFactory;

@Nullable private PartitionPredicate partitionFilter = null;

public ManifestsReader(
RowType partitionType,
String partitionDefaultValue,
SnapshotManager snapshotManager,
ManifestList.Factory manifestListFactory) {
this.partitionType = partitionType;
this.partitionDefaultValue = partitionDefaultValue;
this.snapshotManager = snapshotManager;
this.manifestListFactory = manifestListFactory;
}
Expand All @@ -65,6 +71,11 @@ public ManifestsReader withPartitionFilter(List<BinaryRow> partitions) {
return this;
}

public ManifestsReader withPartitionsFilter(List<Map<String, String>> partitions) {
return withPartitionFilter(
createBinaryPartitions(partitions, partitionType, partitionDefaultValue));
}

public ManifestsReader withPartitionFilter(PartitionPredicate predicate) {
this.partitionFilter = predicate;
return this;
Expand Down
Loading
Loading