Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.OBJECT_LOCATION;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.catalog.CatalogUtils.buildFormatTableByTableSchema;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.getTableType;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
Expand Down Expand Up @@ -309,7 +312,7 @@ private void createObjectTable(Identifier identifier, Schema schema) {
ObjectTable.SCHEMA,
rowType);
checkArgument(
schema.options().containsKey(CoreOptions.OBJECT_LOCATION.key()),
schema.options().containsKey(OBJECT_LOCATION.key()),
"Object table should have object-location option.");
createTableImpl(identifier, schema.copy(ObjectTable.SCHEMA));
}
Expand Down Expand Up @@ -383,9 +386,20 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
}

// hive override this method.
protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableMeta tableMeta = getDataTableMeta(identifier);
TableType tableType = getTableType(tableMeta.schema().options());
if (tableType == TableType.FORMAT_TABLE) {
TableSchema schema = tableMeta.schema();
return buildFormatTableByTableSchema(
identifier,
schema.options(),
schema.logicalRowType(),
schema.partitionKeys(),
schema.comment());
}
FileStoreTable table =
FileStoreTableFactory.create(
fileIO,
Expand All @@ -399,9 +413,8 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist
lockContext().orElse(null),
identifier),
catalogLoader()));
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
if (tableType == TableType.OBJECT_TABLE) {
String objectLocation = table.coreOptions().objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
package org.apache.paimon.catalog;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;

Expand Down Expand Up @@ -191,4 +194,33 @@ public static List<Partition> listPartitionsFromFileSystem(Table table) {
}
return partitions;
}

public static TableType getTableType(Map<String, String> options) {
return options.containsKey(CoreOptions.TYPE.key())
? TableType.fromString(options.get(CoreOptions.TYPE.key()))
: CoreOptions.TYPE.defaultValue();
}

public static FormatTable buildFormatTableByTableSchema(
Identifier identifier,
Map<String, String> options,
RowType rowType,
List<String> partitionKeys,
String comment) {
FormatTable.Format format =
FormatTable.parseFormat(
options.getOrDefault(
CoreOptions.FILE_FORMAT.key(),
CoreOptions.FILE_FORMAT.defaultValue()));
String location = options.get(CoreOptions.PATH.key());
return FormatTable.builder()
.identifier(identifier)
.rowType(rowType)
.partitionKeys(partitionKeys)
.location(location)
.format(format)
.options(options)
.comment(comment)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ public <T extends RESTResponse> T get(
return exec(request, responseType);
}

@Override
public <T extends RESTResponse> T post(
String path, RESTRequest body, Map<String, String> headers) {
return post(path, body, null, headers);
}

@Override
public <T extends RESTResponse> T post(
String path, RESTRequest body, Class<T> responseType, Map<String, String> headers) {
Expand Down
100 changes: 89 additions & 11 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.rest;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
Expand All @@ -29,6 +28,7 @@
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
Expand All @@ -40,9 +40,13 @@
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterPartitionsRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
import org.apache.paimon.rest.requests.CreatePartitionsRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
import org.apache.paimon.rest.requests.DropPartitionsRequest;
import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.ConfigResponse;
Expand All @@ -61,6 +65,7 @@
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;

Expand All @@ -79,9 +84,12 @@

import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.catalog.CatalogUtils.buildFormatTableByTableSchema;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.getTableType;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
Expand Down Expand Up @@ -390,32 +398,89 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
@Override
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
throw new UnsupportedOperationException();
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
CreatePartitionsRequest request = new CreatePartitionsRequest(partitions);
client.post(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
}
}

@Override
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
throw new UnsupportedOperationException();
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
DropPartitionsRequest request = new DropPartitionsRequest(partitions);
client.post(
resourcePaths.dropPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
} else {
FileStoreTable fileStoreTable = (FileStoreTable) table;
try (FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
createCommitUser(
fileStoreTable.coreOptions().toConfiguration()))) {
commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER);
}
}
}

@Override
public void alterPartitions(Identifier identifier, List<Partition> partitions)
throws TableNotExistException {
throw new UnsupportedOperationException();
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
AlterPartitionsRequest request = new AlterPartitionsRequest(partitions);
client.post(
resourcePaths.alterPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
}
}

@Override
public void markDonePartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
throw new UnsupportedOperationException();
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
MarkDonePartitionsRequest request = new MarkDonePartitionsRequest(partitions);
client.post(
resourcePaths.markDonePartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
}
}

@Override
public List<Partition> listPartitions(Identifier identifier) throws TableNotExistException {
Table table = getTable(identifier);
Options options = Options.fromMap(table.options());
if (!options.get(METASTORE_PARTITIONED_TABLE)) {
if (!isMetaStorePartitionedTable(table)) {
return listPartitionsFromFileSystem(table);
}

Expand Down Expand Up @@ -471,7 +536,16 @@ private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistEx
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}

TableType tableType = getTableType(response.getSchema().options());
if (tableType == TableType.FORMAT_TABLE) {
Schema schema = response.getSchema();
return buildFormatTableByTableSchema(
identifier,
schema.options(),
schema.rowType(),
schema.partitionKeys(),
schema.comment());
}
TableSchema schema = TableSchema.create(response.getSchemaId(), response.getSchema());
FileStoreTable table =
FileStoreTableFactory.create(
Expand All @@ -483,9 +557,8 @@ private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistEx
response.getId(),
Lock.emptyFactory(),
catalogLoader()));
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
if (tableType == TableType.OBJECT_TABLE) {
String objectLocation = table.coreOptions().objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
Expand All @@ -497,6 +570,11 @@ private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistEx
return table;
}

private boolean isMetaStorePartitionedTable(Table table) {
Options options = Options.fromMap(table.options());
return Boolean.TRUE.equals(options.get(METASTORE_PARTITIONED_TABLE));
}

private Map<String, String> headers() {
return catalogAuth.getHeaders();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public interface RESTClient extends Closeable {

<T extends RESTResponse> T get(String path, Class<T> responseType, Map<String, String> headers);

<T extends RESTResponse> T post(String path, RESTRequest body, Map<String, String> headers);

<T extends RESTResponse> T post(
String path, RESTRequest body, Class<T> responseType, Map<String, String> headers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,19 @@ public String renameTable(String databaseName, String tableName) {
public String partitions(String databaseName, String tableName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions");
}

public String dropPartitions(String databaseName, String tableName) {
return SLASH.join(
V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions", "drop");
}

public String alterPartitions(String databaseName, String tableName) {
return SLASH.join(
V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions", "alter");
}

public String markDonePartitions(String databaseName, String tableName) {
return SLASH.join(
V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions", "mark");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,32 @@

package org.apache.paimon.rest.requests;

import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.RESTRequest;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Map;
import java.util.List;

/** Request for deleting partition. */
/** Request for altering partitions. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class DropPartitionRequest implements RESTRequest {
public class AlterPartitionsRequest implements RESTRequest {

private static final String FIELD_PARTITION_SPEC = "spec";
public static final String FIELD_PARTITIONS = "partitions";

@JsonProperty(FIELD_PARTITION_SPEC)
private final Map<String, String> partitionSpec;
@JsonProperty(FIELD_PARTITIONS)
private final List<Partition> partitions;

@JsonCreator
public DropPartitionRequest(
@JsonProperty(FIELD_PARTITION_SPEC) Map<String, String> partitionSpec) {
this.partitionSpec = partitionSpec;
public AlterPartitionsRequest(@JsonProperty(FIELD_PARTITIONS) List<Partition> partitions) {
this.partitions = partitions;
}

@JsonGetter(FIELD_PARTITION_SPEC)
public Map<String, String> getPartitionSpec() {
return partitionSpec;
@JsonGetter(FIELD_PARTITIONS)
public List<Partition> getPartitions() {
return partitions;
}
}
Loading
Loading