Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d432640
add IT to flink when use RESTCatalog
jerry-024 Dec 31, 2024
90c9144
add IT
jerry-024 Jan 6, 2025
f9d9803
update
jerry-024 Jan 6, 2025
85d3f4a
support use MockRESTCatalogServer mock RESTCatalog server
jerry-024 Jan 7, 2025
d00623a
add get database IT
jerry-024 Jan 7, 2025
f5037c4
add IT for auth fail
jerry-024 Jan 7, 2025
da4d47a
add create table in streaming mode IT
jerry-024 Jan 8, 2025
7675c93
move MockRESTCatalogServer to rest package and add RESTCatalogMockSer…
jerry-024 Jan 8, 2025
bbe60b3
add IT RESTCatalogITCase
jerry-024 Jan 8, 2025
f3c1403
delete no need test
jerry-024 Jan 8, 2025
aafa477
fix fail ut in RESTCatalogMockServerTest
jerry-024 Jan 8, 2025
d6039fb
fix alter table case fail in RESTCatalogMockServerTest
jerry-024 Jan 8, 2025
685224e
fix renameTable fail
jerry-024 Jan 8, 2025
10159cf
fix all fail case in RESTCatalogMockServerTest
jerry-024 Jan 8, 2025
c56388c
Merge branch 'master' into rest-catalog
jerry-024 Jan 9, 2025
1df6389
delete no need update
jerry-024 Jan 9, 2025
8e08f04
add partition test and auth token test
jerry-024 Jan 9, 2025
9317d61
add alter table IT case for RESTCatalog
jerry-024 Jan 9, 2025
df7ef44
format
jerry-024 Jan 9, 2025
554eb94
update rest uri prefix to paimon
jerry-024 Jan 9, 2025
1fc3b37
fix check style fail
jerry-024 Jan 9, 2025
feef4c2
update alter database test and add alter column type in RESTCatalogIT…
jerry-024 Jan 9, 2025
c2e604d
Merge branch 'master' into rest-catalog
jerry-024 Jan 9, 2025
3b0b9ea
delete mock in HttpClientTest
jerry-024 Jan 9, 2025
dcd9d10
fix compile error
jerry-024 Jan 9, 2025
94ef732
fix list tables in RESTCatalog
jerry-024 Jan 9, 2025
609ee96
add test for get table from system database
jerry-024 Jan 9, 2025
356aa63
add ErrorResponseResourceType to define resource type when fail in rest
jerry-024 Jan 9, 2025
8416959
fix compile error
jerry-024 Jan 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ under the License.

<properties>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
<okhttp.version>4.12.0</okhttp.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
Expand Down Expand Up @@ -504,17 +505,6 @@ private void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}

private void validateAutoCreateClose(Map<String, String> options) {
checkArgument(
!Boolean.parseBoolean(
options.getOrDefault(
CoreOptions.AUTO_CREATE.key(),
CoreOptions.AUTO_CREATE.defaultValue().toString())),
String.format(
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

private void validateCustomTablePath(Map<String, String> options) {
if (!allowCustomTablePath() && options.containsKey(CoreOptions.PATH.key())) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
Expand All @@ -38,6 +39,7 @@
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Utils for {@link Catalog}. */
public class CatalogUtils {
Expand Down Expand Up @@ -108,6 +110,17 @@ public static void checkNotBranch(Identifier identifier, String method) {
}
}

public static void validateAutoCreateClose(Map<String, String> options) {
checkArgument(
!Boolean.parseBoolean(
options.getOrDefault(
CoreOptions.AUTO_CREATE.key(),
CoreOptions.AUTO_CREATE.defaultValue().toString())),
String.format(
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

public static Table createSystemTable(Identifier identifier, Table originTable)
throws Catalog.TableNotExistException {
if (!(originTable instanceof FileStoreTable)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.exceptions.ServiceUnavailableException;
import org.apache.paimon.rest.exceptions.UnsupportedOperationException;
import org.apache.paimon.rest.responses.ErrorResponse;

/** Default error handler. */
Expand All @@ -43,18 +44,20 @@ public void accept(ErrorResponse error) {
String message = error.getMessage();
switch (code) {
case 400:
throw new BadRequestException(String.format("Malformed request: %s", message));
throw new BadRequestException(String.format("%s", message));
case 401:
throw new NotAuthorizedException("Not authorized: %s", message);
case 403:
throw new ForbiddenException("Forbidden: %s", message);
case 404:
throw new NoSuchResourceException("%s", message);
throw new NoSuchResourceException(
error.getResourceType(), error.getResourceName(), "%s", message);
case 405:
case 406:
break;
case 409:
throw new AlreadyExistsException("%s", message);
throw new AlreadyExistsException(
error.getResourceType(), error.getResourceName(), "%s", message);
case 500:
throw new ServiceFailureException("Server error: %s", message);
case 501:
Expand Down
23 changes: 18 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ public HttpClient(Options options) {
}

public HttpClient(HttpClientOptions httpClientOptions) {
this.uri = httpClientOptions.uri();
if (httpClientOptions.uri() != null && httpClientOptions.uri().endsWith("/")) {
this.uri = httpClientOptions.uri().substring(0, httpClientOptions.uri().length() - 1);
} else {
this.uri = httpClientOptions.uri();
}
this.okHttpClient = createHttpClient(httpClientOptions);
this.errorHandler = DefaultErrorHandler.getInstance();
}
Expand Down Expand Up @@ -132,10 +136,19 @@ private <T extends RESTResponse> T exec(Request request, Class<T> responseType)
try (Response response = okHttpClient.newCall(request).execute()) {
String responseBodyStr = response.body() != null ? response.body().string() : null;
if (!response.isSuccessful()) {
ErrorResponse error =
new ErrorResponse(
responseBodyStr != null ? responseBodyStr : "response body is null",
response.code());
ErrorResponse error;
try {
error = OBJECT_MAPPER.readValue(responseBodyStr, ErrorResponse.class);
} catch (JsonProcessingException e) {
error =
new ErrorResponse(
null,
null,
responseBodyStr != null
? responseBodyStr
: "response body is null",
response.code());
}
errorHandler.accept(error);
}
if (responseType != null && responseBodyStr != null) {
Expand Down
57 changes: 50 additions & 7 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.auth.AuthSession;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
Expand All @@ -45,6 +47,7 @@
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
Expand Down Expand Up @@ -75,9 +78,12 @@
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
import static org.apache.paimon.rest.auth.AuthSession.createAuthSession;
Expand Down Expand Up @@ -209,7 +215,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade
throw new DatabaseNotEmptyException(name);
}
client.delete(resourcePaths.database(name), headers());
} catch (NoSuchResourceException e) {
} catch (NoSuchResourceException | DatabaseNotExistException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(name);
}
Expand Down Expand Up @@ -249,12 +255,19 @@ public void alterDatabase(String name, List<PropertyChange> changes, boolean ign

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
ListTablesResponse response =
client.get(resourcePaths.tables(databaseName), ListTablesResponse.class, headers());
if (response.getTables() != null) {
return response.getTables();
try {
ListTablesResponse response =
client.get(
resourcePaths.tables(databaseName),
ListTablesResponse.class,
headers());
if (response.getTables() != null) {
return response.getTables();
}
return ImmutableList.of();
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(databaseName);
}
return ImmutableList.of();
}

@Override
Expand All @@ -272,6 +285,9 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
try {
checkNotBranch(identifier, "createTable");
checkNotSystemTable(identifier, "createTable");
validateAutoCreateClose(schema.options());
CreateTableRequest request = new CreateTableRequest(identifier, schema);
client.post(
resourcePaths.tables(identifier.getDatabaseName()),
Expand All @@ -282,12 +298,24 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
if (!ignoreIfExists) {
throw new TableAlreadyExistException(identifier);
}
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(identifier.getDatabaseName());
} catch (BadRequestException e) {
throw new RuntimeException(new IllegalArgumentException(e.getMessage()));
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
checkNotBranch(fromTable, "renameTable");
checkNotBranch(toTable, "renameTable");
checkNotSystemTable(fromTable, "renameTable");
checkNotSystemTable(toTable, "renameTable");
try {
RenameTableRequest request = new RenameTableRequest(toTable);
client.post(
Expand All @@ -311,6 +339,7 @@ public void renameTable(Identifier fromTable, Identifier toTable, boolean ignore
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
checkNotSystemTable(identifier, "alterTable");
try {
AlterTableRequest request = new AlterTableRequest(changes);
client.post(
Expand All @@ -320,16 +349,30 @@ public void alterTable(
headers());
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(identifier);
if (e.resourceType() == ErrorResponseResourceType.TABLE) {
throw new TableNotExistException(identifier);
} else if (e.resourceType() == ErrorResponseResourceType.COLUMN) {
throw new ColumnNotExistException(identifier, e.resourceName());
}
}
} catch (AlreadyExistsException e) {
throw new ColumnAlreadyExistException(identifier, e.resourceName());
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
} catch (org.apache.paimon.rest.exceptions.UnsupportedOperationException e) {
throw new UnsupportedOperationException(e.getMessage());
} catch (ServiceFailureException e) {
throw new IllegalStateException(e.getMessage());
} catch (BadRequestException e) {
throw new RuntimeException(new IllegalArgumentException(e.getMessage()));
}
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
checkNotBranch(identifier, "dropTable");
checkNotSystemTable(identifier, "dropTable");
try {
client.delete(
resourcePaths.table(identifier.getDatabaseName(), identifier.getTableName()),
Expand Down
54 changes: 14 additions & 40 deletions paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

import org.apache.paimon.options.Options;

import java.util.StringJoiner;
import org.apache.paimon.shade.guava30.com.google.common.base.Joiner;

/** Resource paths for REST catalog. */
public class ResourcePaths {

public static final String V1_CONFIG = "/v1/config";
private static final StringJoiner SLASH = new StringJoiner("/");
private static final Joiner SLASH = Joiner.on("/").skipNulls();
private static final String V1 = "/v1";
private static final String DATABASES = "databases";
private static final String TABLES = "tables";

public static final String V1_CONFIG = V1 + "/config";

public static ResourcePaths forCatalogProperties(Options options) {
return new ResourcePaths(options.get(RESTCatalogInternalOptions.PREFIX));
Expand All @@ -39,60 +43,30 @@ public ResourcePaths(String prefix) {
}

public String databases() {
return SLASH.add("v1").add(prefix).add("databases").toString();
return SLASH.join(V1, prefix, DATABASES);
}

public String database(String databaseName) {
return SLASH.add("v1").add(prefix).add("databases").add(databaseName).toString();
return SLASH.join(V1, prefix, DATABASES, databaseName);
}

public String databaseProperties(String databaseName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("properties")
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, "properties");
}

public String tables(String databaseName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("tables")
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES);
}

public String table(String databaseName, String tableName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("tables")
.add(tableName)
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName);
}

public String renameTable(String databaseName, String tableName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("tables")
.add(tableName)
.add("rename")
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "rename");
}

public String partitions(String databaseName, String tableName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("tables")
.add(tableName)
.add("partitions")
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public class AuthSession {
private volatile Map<String, String> headers;

public AuthSession(Map<String, String> headers, CredentialsProvider credentialsProvider) {
this.headers = headers;
this.credentialsProvider = credentialsProvider;
this.headers = RESTUtil.merge(headers, this.credentialsProvider.authHeader());
}

public static AuthSession fromRefreshCredentialsProvider(
Expand Down
Loading
Loading