Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.exceptions.ServiceUnavailableException;
import org.apache.paimon.rest.exceptions.UnsupportedOperationException;
import org.apache.paimon.rest.responses.ErrorResponse;

/** Default error handler. */
Expand Down Expand Up @@ -61,7 +61,7 @@ public void accept(ErrorResponse error) {
case 500:
throw new ServiceFailureException("Server error: %s", message);
case 501:
throw new UnsupportedOperationException(message);
throw new NotImplementedException(message);
case 503:
throw new ServiceUnavailableException("Service unavailable: %s", message);
default:
Expand Down
125 changes: 55 additions & 70 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterPartitionsRequest;
Expand Down Expand Up @@ -84,7 +85,6 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
Expand Down Expand Up @@ -392,7 +392,7 @@ public void alterTable(
throw new ColumnAlreadyExistException(identifier, e.resourceName());
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
} catch (org.apache.paimon.rest.exceptions.UnsupportedOperationException e) {
} catch (NotImplementedException e) {
throw new UnsupportedOperationException(e.getMessage());
} catch (ServiceFailureException e) {
throw new IllegalStateException(e.getMessage());
Expand Down Expand Up @@ -422,38 +422,35 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
@Override
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
CreatePartitionsRequest request = new CreatePartitionsRequest(partitions);
client.post(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
try {
CreatePartitionsRequest request = new CreatePartitionsRequest(partitions);
client.post(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
// not a metastore partitioned table
}
}

@Override
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
DropPartitionsRequest request = new DropPartitionsRequest(partitions);
client.post(
resourcePaths.dropPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
} else {
FileStoreTable fileStoreTable = (FileStoreTable) table;
try {
DropPartitionsRequest request = new DropPartitionsRequest(partitions);
client.post(
resourcePaths.dropPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
// not a metastore partitioned table
FileStoreTable fileStoreTable = (FileStoreTable) getTable(identifier);
try (FileStoreCommit commit =
fileStoreTable
.store()
Expand All @@ -468,65 +465,58 @@ public void dropPartitions(Identifier identifier, List<Map<String, String>> part
@Override
public void alterPartitions(Identifier identifier, List<Partition> partitions)
throws TableNotExistException {
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
AlterPartitionsRequest request = new AlterPartitionsRequest(partitions);
client.post(
resourcePaths.alterPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
try {
AlterPartitionsRequest request = new AlterPartitionsRequest(partitions);
client.post(
resourcePaths.alterPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
// not a metastore partitioned table
}
}

@Override
public void markDonePartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
MarkDonePartitionsRequest request = new MarkDonePartitionsRequest(partitions);
client.post(
resourcePaths.markDonePartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
try {
MarkDonePartitionsRequest request = new MarkDonePartitionsRequest(partitions);
client.post(
resourcePaths.markDonePartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
// not a metastore partitioned table
}
}

@Override
public List<Partition> listPartitions(Identifier identifier) throws TableNotExistException {
Table table = getTable(identifier);
if (!isMetaStorePartitionedTable(table)) {
return listPartitionsFromFileSystem(table);
}

ListPartitionsResponse response;
try {
response =
ListPartitionsResponse response =
client.get(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
ListPartitionsResponse.class,
headers());
if (response == null || response.getPartitions() == null) {
return Collections.emptyList();
}
return response.getPartitions();
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
} catch (NotImplementedException e) {
// not a metastore partitioned table
return listPartitionsFromFileSystem(getTable(identifier));
}

if (response == null || response.getPartitions() == null) {
return Collections.emptyList();
}

return response.getPartitions();
}

@Override
Expand Down Expand Up @@ -626,11 +616,6 @@ public void close() throws Exception {
}
}

private boolean isMetaStorePartitionedTable(Table table) {
Options options = Options.fromMap(table.options());
return Boolean.TRUE.equals(options.get(METASTORE_PARTITIONED_TABLE));
}

private Map<String, String> headers() {
return catalogAuth.getHeaders();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.apache.paimon.rest.exceptions;

/** Exception thrown on HTTP 501 - UnsupportedOperationException. */
public class UnsupportedOperationException extends RESTException {
/** Exception thrown on HTTP 501 - NotImplementedException. */
public class NotImplementedException extends RESTException {

public UnsupportedOperationException(String message, Object... args) {
public NotImplementedException(String message, Object... args) {
super(String.format(message, args));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.exceptions.ServiceUnavailableException;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void testHandleErrorResponse() {
ServiceFailureException.class,
() -> defaultErrorHandler.accept(generateErrorResponse(500)));
assertThrows(
org.apache.paimon.rest.exceptions.UnsupportedOperationException.class,
NotImplementedException.class,
() -> defaultErrorHandler.accept(generateErrorResponse(501)));
assertThrows(
RESTException.class, () -> defaultErrorHandler.accept(generateErrorResponse(502)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.rest;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Database;
Expand Down Expand Up @@ -68,6 +69,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
Expand Down Expand Up @@ -178,6 +180,11 @@ public MockResponse dispatch(RecordedRequest request) {
if (isDropPartitions) {
String tableName = resources[2];
Identifier identifier = Identifier.create(databaseName, tableName);
Optional<MockResponse> error =
checkTablePartitioned(catalog, identifier);
if (error.isPresent()) {
return error.get();
}
DropPartitionsRequest dropPartitionsRequest =
OBJECT_MAPPER.readValue(
request.getBody().readUtf8(),
Expand All @@ -188,6 +195,11 @@ public MockResponse dispatch(RecordedRequest request) {
} else if (isAlterPartitions) {
String tableName = resources[2];
Identifier identifier = Identifier.create(databaseName, tableName);
Optional<MockResponse> error =
checkTablePartitioned(catalog, identifier);
if (error.isPresent()) {
return error.get();
}
AlterPartitionsRequest alterPartitionsRequest =
OBJECT_MAPPER.readValue(
request.getBody().readUtf8(),
Expand All @@ -198,6 +210,11 @@ public MockResponse dispatch(RecordedRequest request) {
} else if (isMarkDonePartitions) {
String tableName = resources[2];
Identifier identifier = Identifier.create(databaseName, tableName);
Optional<MockResponse> error =
checkTablePartitioned(catalog, identifier);
if (error.isPresent()) {
return error.get();
}
MarkDonePartitionsRequest markDonePartitionsRequest =
OBJECT_MAPPER.readValue(
request.getBody().readUtf8(),
Expand All @@ -207,6 +224,12 @@ public MockResponse dispatch(RecordedRequest request) {
return new MockResponse().setResponseCode(200);
} else if (isPartitions) {
String tableName = resources[2];
Optional<MockResponse> error =
checkTablePartitioned(
catalog, Identifier.create(databaseName, tableName));
if (error.isPresent()) {
return error.get();
}
return partitionsApiHandler(catalog, request, databaseName, tableName);
} else if (isTableToken) {
GetTableTokenResponse getTableTokenResponse =
Expand Down Expand Up @@ -326,6 +349,24 @@ public MockResponse dispatch(RecordedRequest request) {
};
}

private static Optional<MockResponse> checkTablePartitioned(
Catalog catalog, Identifier identifier) {
Table table;
try {
table = catalog.getTable(identifier);
} catch (Catalog.TableNotExistException e) {
return Optional.of(
mockResponse(
new ErrorResponse(ErrorResponseResourceType.TABLE, null, "", 404),
404));
}
boolean partitioned = CoreOptions.fromMap(table.options()).partitionedTableInMetastore();
if (!partitioned) {
return Optional.of(mockResponse(new ErrorResponse(null, null, "", 501), 501));
}
return Optional.empty();
}

private static MockResponse commitTableApiHandler(
Catalog catalog, RecordedRequest request, String databaseName, String tableName)
throws Exception {
Expand Down
Loading