diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index e9510e8d552d..d82016f9315b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; +import org.apache.paimon.rest.exceptions.NotImplementedException; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -323,6 +324,8 @@ private static List> toTableAndSnapshots( snapshot = optional.get(); } } catch (Catalog.TableNotExistException ignored) { + } catch (NotImplementedException ignored) { + // does not support supportsVersionManagement for external paimon table } } tableAndSnapshots.add(Pair.of(table, snapshot)); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 0d34e17bc8e2..1bf8ff2f3538 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.PagedList; import org.apache.paimon.Snapshot; +import org.apache.paimon.TableType; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; @@ -33,6 +34,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.function.Function; import org.apache.paimon.function.FunctionChange; +import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; import org.apache.paimon.rest.exceptions.AlreadyExistsException; @@ -48,6 +50,7 @@ import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.Instant; import org.apache.paimon.table.Table; @@ -71,12 +74,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BRANCH; import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; @@ -443,7 +448,8 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx checkNotSystemTable(identifier, "createTable"); validateCreateTable(schema); createExternalTablePathIfNotExist(schema); - api.createTable(identifier, schema); + Schema newSchema = inferSchemaIfExternalPaimonTable(schema); + api.createTable(identifier, newSchema); } catch (AlreadyExistsException e) { if (!ignoreIfExists) { throw new TableAlreadyExistException(identifier); @@ -998,4 +1004,30 @@ private void createExternalTablePathIfNotExist(Schema schema) throws IOException } } } + + private Schema inferSchemaIfExternalPaimonTable(Schema schema) throws Exception { + TableType tableType = Options.fromMap(schema.options()).get(TYPE); + String externalLocation = schema.options().get(PATH.key()); + + if (TableType.TABLE.equals(tableType) && Objects.nonNull(externalLocation)) { + Path externalPath = new Path(externalLocation); + SchemaManager schemaManager = + new SchemaManager(fileIOFromOptions(externalPath), externalPath); + Optional latest = schemaManager.latest(); + if (latest.isPresent()) { + // Note we just validate schema here, will not create a new table + schemaManager.createTable(schema, true); + Schema existsSchema = latest.get().toSchema(); + // use `owner` and `path` from the user provide schema + if (Objects.nonNull(schema.options().get(Catalog.OWNER_PROP))) { + existsSchema + .options() + .put(Catalog.OWNER_PROP, schema.options().get(Catalog.OWNER_PROP)); + } + existsSchema.options().put(PATH.key(), schema.options().get(PATH.key())); + return existsSchema; + } + } + return schema; + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 51d1218b2ef9..08fc1c9778b0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -683,6 +683,19 @@ private MockResponse getDataTokenHandle(Identifier tableIdentifier) throws Excep } private MockResponse snapshotHandle(Identifier identifier) throws Exception { + if (!tableMetadataStore.containsKey(identifier.getFullName())) { + throw new Catalog.TableNotExistException(identifier); + } + TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName()); + if (tableMetadata.isExternal()) { + ErrorResponse response = + new ErrorResponse( + ErrorResponse.RESOURCE_TYPE_TABLE, + identifier.getFullName(), + "external paimon table does not support get table snapshot in rest server", + 501); + return mockResponse(response, 404); + } RESTResponse response; Optional snapshotOptional = Optional.ofNullable(tableLatestSnapshotStore.get(identifier.getFullName())); @@ -714,6 +727,7 @@ private MockResponse listSnapshots(Identifier identifier) throws Exception { } private MockResponse loadSnapshot(Identifier identifier, String version) throws Exception { + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); SnapshotManager snapshotManager = table.snapshotManager(); Snapshot snapshot = null; @@ -1279,13 +1293,16 @@ private MockResponse tablesHandle( tableMetadata = createObjectTable(identifier, schema); } else { catalog.createTable(identifier, schema, false); + boolean isExternal = + schema.options() != null + && schema.options().containsKey(PATH.key()); tableMetadata = createTableMetadata( requestBody.getIdentifier(), 0L, requestBody.getSchema(), UUID.randomUUID().toString(), - false); + isExternal); } tableMetadataStore.put( requestBody.getIdentifier().getFullName(), tableMetadata); @@ -1510,10 +1527,16 @@ private MockResponse tableHandle(String method, String data, Identifier identifi alterTableImpl(identifier, requestBody.getChanges()); return new MockResponse().setResponseCode(200); case "DELETE": - try { - catalog.dropTable(identifier, false); - } catch (Exception e) { - System.out.println(e.getMessage()); + if (!tableMetadataStore.containsKey(identifier.getFullName())) { + return new MockResponse().setResponseCode(404); + } + tableMetadata = tableMetadataStore.get(identifier.getFullName()); + if (!tableMetadata.isExternal()) { + try { + catalog.dropTable(identifier, false); + } catch (Exception e) { + System.out.println(e.getMessage()); + } } tableMetadataStore.remove(identifier.getFullName()); tableLatestSnapshotStore.remove(identifier.getFullName()); @@ -1532,7 +1555,7 @@ private MockResponse renameTableHandle(String data) throws Exception { throw new Catalog.TableNoPermissionException(fromTable); } else if (tableMetadataStore.containsKey(fromTable.getFullName())) { TableMetadata tableMetadata = tableMetadataStore.get(fromTable.getFullName()); - if (!isFormatTable(tableMetadata.schema().toSchema())) { + if (!isFormatTable(tableMetadata.schema().toSchema()) && !tableMetadata.isExternal()) { catalog.renameTable(requestBody.getSource(), requestBody.getDestination(), false); } if (tableMetadataStore.containsKey(toTable.getFullName())) { @@ -2066,6 +2089,17 @@ private MockResponse commitSnapshot( Snapshot snapshot, List statistics) throws Catalog.TableNotExistException { + if (!tableMetadataStore.containsKey(identifier.getFullName())) { + throw new Catalog.TableNotExistException(identifier); + } + boolean isExternal = tableMetadataStore.get(identifier.getFullName()).isExternal(); + if (isExternal) { + new ErrorResponse( + ErrorResponse.RESOURCE_TYPE_TABLE, + identifier.getFullName(), + "external paimon table does not support commit in rest server", + 501); + } FileStoreTable table = getFileTable(identifier); if (!tableId.equals(table.catalogEnvironment().uuid())) { throw new Catalog.TableNotExistException(identifier); @@ -2223,7 +2257,10 @@ private MockResponse mockResponse(RESTResponse response, int httpCode) { private TableMetadata createTableMetadata( Identifier identifier, long schemaId, Schema schema, String uuid, boolean isExternal) { Map options = new HashMap<>(schema.options()); - Path path = catalog.getTableLocation(identifier); + Path path = + isExternal && Objects.nonNull(schema.options().get(PATH.key())) + ? new Path(schema.options().get(PATH.key())) + : catalog.getTableLocation(identifier); String restPath = path.toString(); if (this.configResponse .getDefaults() @@ -2261,27 +2298,22 @@ private TableMetadata createObjectTable(Identifier identifier, Schema schema) { return createTableMetadata(identifier, 1L, newSchema, UUID.randomUUID().toString(), false); } - private FileStoreTable getFileTable(Identifier identifier) - throws Catalog.TableNotExistException { - if (tableMetadataStore.containsKey(identifier.getFullName())) { - TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName()); - TableSchema schema = tableMetadata.schema(); - CatalogEnvironment catalogEnv = - new CatalogEnvironment( - identifier, - tableMetadata.uuid(), - catalog.catalogLoader(), - catalog.lockFactory().orElse(null), - catalog.lockContext().orElse(null), - catalogContext, - false); - Path path = new Path(schema.options().get(PATH.key())); - FileIO dataFileIO = catalog.fileIO(); - FileStoreTable table = - FileStoreTableFactory.create(dataFileIO, path, schema, catalogEnv); - return table; - } - throw new Catalog.TableNotExistException(identifier); + private FileStoreTable getFileTable(Identifier identifier) { + TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName()); + TableSchema schema = tableMetadata.schema(); + CatalogEnvironment catalogEnv = + new CatalogEnvironment( + identifier, + tableMetadata.uuid(), + catalog.catalogLoader(), + catalog.lockFactory().orElse(null), + catalog.lockContext().orElse(null), + catalogContext, + false); + Path path = new Path(schema.options().get(PATH.key())); + FileIO dataFileIO = catalog.fileIO(); + FileStoreTable table = FileStoreTableFactory.create(dataFileIO, path, schema, catalogEnv); + return table; } private static int getMaxResults(Map parameters) { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 5d76946023a2..2a7c1945bf72 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.rest; +import org.apache.paimon.CoreOptions; import org.apache.paimon.PagedList; import org.apache.paimon.Snapshot; import org.apache.paimon.TableType; @@ -2686,66 +2687,102 @@ public void testCreateExternalTable(@TempDir java.nio.file.Path path) throws Exc Table tableAgain = catalog.getTable(identifier); assertThat(tableAgain).isNotNull(); assertThat(tableAgain.comment()).isEqualTo(Optional.of("External table for testing")); + } + + @Test + public void testCreateExternalTableWithSchemaInference(@TempDir java.nio.file.Path path) + throws Exception { + Path externalTablePath = new Path(path.toString(), "external_table_inference_location"); + DEFAULT_TABLE_SCHEMA.options().put(CoreOptions.PATH.key(), externalTablePath.toString()); + restCatalog.createDatabase("test_schema_inference_db", true); + Identifier identifier = + Identifier.create("test_schema_inference_db", "external_inference_table"); + try { + catalog.dropTable(identifier, true); + } catch (Exception e) { + // Ignore drop errors + } + + createExternalTableDirectory(externalTablePath, DEFAULT_TABLE_SCHEMA); + Schema emptySchema = + new Schema( + Lists.newArrayList(), + Collections.emptyList(), + Collections.emptyList(), + DEFAULT_TABLE_SCHEMA.options(), + ""); + catalog.createTable(identifier, emptySchema, false); - testReadSystemTables(); + Table table = catalog.getTable(identifier); + assertThat(table).isNotNull(); + assertThat(table.rowType().getFieldCount()).isEqualTo(3); + assertThat(table.rowType().getFieldNames()).containsExactly("pk", "col1", "col2"); - // Verify external table path still exists after operations - assertTrue( - fileIO.exists(externalTablePath), - "External table path should still exist after operations"); + Schema clientProvidedSchema = + new Schema( + Lists.newArrayList( + new DataField(0, "pk", DataTypes.INT()), + new DataField(1, "col1", DataTypes.STRING())), + Collections.emptyList(), + Collections.emptyList(), + DEFAULT_TABLE_SCHEMA.options(), + ""); + // schema mismatch should throw an exception + Assertions.assertThrows( + RuntimeException.class, + () -> catalog.createTable(identifier, clientProvidedSchema, false)); + DEFAULT_TABLE_SCHEMA.options().remove(CoreOptions.PATH.key()); + } - // Test dropping external table - data should remain - catalog.dropTable(identifier, false); + @Test + public void testReadSystemTablesWithExternalTable(@TempDir java.nio.file.Path path) + throws Exception { + // Create an external table + Path externalTablePath = new Path(path.toString(), "external_sys_table_location"); + DEFAULT_TABLE_SCHEMA.options().put(CoreOptions.PATH.key(), externalTablePath.toString()); - // Verify external table path still exists after drop (external table behavior) - assertTrue( - fileIO.exists(externalTablePath), - "External table path should still exist after drop"); + restCatalog.createDatabase("test_sys_table_db", true); + Identifier identifier = Identifier.create("test_sys_table_db", "external_sys_table"); - // Clean up try { - fileIO.deleteQuietly(externalTablePath); + catalog.dropTable(identifier, true); } catch (Exception e) { - // Ignore cleanup errors + // Ignore drop errors } - } - private void testReadSystemTables() throws IOException, Catalog.TableNotExistException { + createExternalTableDirectory(externalTablePath, DEFAULT_TABLE_SCHEMA); + catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + // Test reading system table with external table Identifier allTablesIdentifier = Identifier.create("sys", "tables"); Table allTablesTable = catalog.getTable(allTablesIdentifier); + assertThat(allTablesTable).isNotNull(); - if (allTablesTable != null) { - ReadBuilder allTablesReadBuilder = allTablesTable.newReadBuilder(); - TableRead allTablesRead = allTablesReadBuilder.newRead(); - List allTablesSplits = allTablesReadBuilder.newScan().plan().splits(); + ReadBuilder readBuilder = allTablesTable.newReadBuilder(); + TableRead read = readBuilder.newRead(); + List splits = readBuilder.newScan().plan().splits(); - List allTablesResults = new ArrayList<>(); - for (Split split : allTablesSplits) { - try (RecordReader reader = allTablesRead.createReader(split)) { - reader.forEachRemaining(allTablesResults::add); - } + List results = new ArrayList<>(); + for (Split split : splits) { + try (RecordReader reader = read.createReader(split)) { + reader.forEachRemaining(results::add); } + } - // Verify that our external table appears in ALL_TABLES - assertThat(allTablesResults).isNotEmpty(); - - // Find our external table in the results - boolean foundExternalTable = false; - for (InternalRow row : allTablesResults) { - String tableName = row.getString(1).toString(); // table_name column - String databaseName = row.getString(0).toString(); // database_name column - if ("external_test_table".equals(tableName) - && "test_external_table_db".equals(databaseName)) { - foundExternalTable = true; - // Verify table properties - String tableType = row.getString(2).toString(); // table_type column - assertThat(tableType) - .isEqualTo("table"); // External tables are still MANAGED type - break; - } + // Verify external table appears in system table + assertThat(results).isNotEmpty(); + boolean foundExternalTable = false; + for (InternalRow row : results) { + String databaseName = row.getString(0).toString(); + String tableName = row.getString(1).toString(); + if ("test_sys_table_db".equals(databaseName) + && "external_sys_table".equals(tableName)) { + foundExternalTable = true; + break; } - assertThat(foundExternalTable).isTrue(); } + assertThat(foundExternalTable).isTrue(); + DEFAULT_TABLE_SCHEMA.options().remove(CoreOptions.PATH.key()); } protected void createTable( @@ -2828,7 +2865,9 @@ protected void generateTokenAndWriteToFile(String tokenPath) throws IOException private void createExternalTableDirectory(Path externalTablePath, Schema schema) throws Exception { // Create external table directory structure - FileIO fileIO = FileIO.get(externalTablePath, CatalogContext.create(new Options())); + FileIO fileIO = + FileIO.get( + externalTablePath, CatalogContext.create(new Options(catalog.options()))); // Create the external table directory if (!fileIO.exists(externalTablePath)) { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java index 26a631534704..5fd3aaf7480e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java @@ -20,8 +20,12 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.Schema; + +import static org.apache.paimon.CoreOptions.PATH; /** * A FileSystemCatalog that supports custom table paths for REST catalog server. This allows REST @@ -37,4 +41,13 @@ public RESTFileSystemCatalog(FileIO fileIO, Path warehouse, CatalogContext conte protected boolean allowCustomTablePath() { return true; } + + @Override + public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException { + boolean isExternal = schema.options() != null && schema.options().containsKey(PATH.key()); + if (!isExternal) { + super.createTable(identifier, schema, ignoreIfExists); + } + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonExternalTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonExternalTableTest.scala new file mode 100644 index 000000000000..1ea9b2ee81d1 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonExternalTableTest.scala @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.table + +import org.apache.paimon.catalog.Identifier +import org.apache.paimon.fs.Path +import org.apache.paimon.fs.local.LocalFileIO +import org.apache.paimon.schema.{Schema, SchemaManager} +import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.types.DataTypes +import org.apache.paimon.utils.StringUtils + +import org.apache.spark.sql.Row + +class PaimonExternalTableTest extends PaimonSparkTestWithRestCatalogBase { + + override protected def beforeEach(): Unit = { + super.beforeEach() + sql("USE paimon") + sql("CREATE DATABASE IF NOT EXISTS test_db") + sql("USE test_db") + // Clean up any existing tables from previous test runs + sql("DROP TABLE IF EXISTS external_tbl") + sql("DROP TABLE IF EXISTS managed_tbl") + sql("DROP TABLE IF EXISTS external_tbl_renamed") + sql("DROP TABLE IF EXISTS t1") + sql("DROP TABLE IF EXISTS t2") + } + + test("PaimonExternalTable: create and drop external table") { + withTempDir { + tbLocation => + withTable("external_tbl", "managed_tbl") { + val externalTbLocation = tbLocation.getCanonicalPath + // Ensure table doesn't exist before starting + sql("DROP TABLE IF EXISTS external_tbl") + + // For REST catalog external tables, schema must be created in filesystem first + val schemaTablePath = new Path(externalTbLocation) + val schemaFileIO = LocalFileIO.create() + val schema = Schema + .newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .option("path", externalTbLocation) + .option("type", "table") + .build() + new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, true) + + // create external table + sql( + s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon LOCATION '$externalTbLocation'") + sql("INSERT INTO external_tbl VALUES (1, 'Alice'), (2, 'Bob')") + checkAnswer( + sql("SELECT * FROM external_tbl ORDER BY id"), + Seq(Row(1, "Alice"), Row(2, "Bob")) + ) + + val table = paimonCatalog + .getTable(Identifier.create("test_db", "external_tbl")) + .asInstanceOf[FileStoreTable] + val fileIO = table.fileIO() + val actualTbLocation = table.location() + + // For REST catalog, the path might be managed internally, but the table should still function as external + // Verify that the table has a location and is accessible + assert(actualTbLocation != null, "External table should have a location") + + // Verify data is accessible + assert(fileIO.exists(actualTbLocation), "External table location should exist") + + // drop external table - data should still exist (this is the key characteristic of external tables) + sql("DROP TABLE external_tbl") + assert(fileIO.exists(actualTbLocation), "External table data should exist after drop") + + // Invalidate catalog cache to ensure table is fully removed + try { + paimonCatalog.invalidateTable(Identifier.create("test_db", "external_tbl")) + } catch { + case _: Exception => // Ignore if table doesn't exist in cache + } + + // Wait a bit and ensure table is fully dropped before recreating + Thread.sleep(100) // Give catalog time to fully process the drop + sql("DROP TABLE IF EXISTS external_tbl") + + // Schema already exists in filesystem from initial creation, no need to recreate + // create external table again using the same location - should be able to read existing data + sql( + s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon LOCATION '$externalTbLocation'") + checkAnswer( + sql("SELECT * FROM external_tbl ORDER BY id"), + Seq(Row(1, "Alice"), Row(2, "Bob")) + ) + + // create managed table for comparison + sql("CREATE TABLE managed_tbl (id INT, name STRING) USING paimon") + sql("INSERT INTO managed_tbl VALUES (3, 'Charlie')") + val managedTable = paimonCatalog + .getTable(Identifier.create("test_db", "managed_tbl")) + .asInstanceOf[FileStoreTable] + val managedTbLocation = managedTable.location() + + // drop managed table - data should be deleted + sql("DROP TABLE managed_tbl") + assert( + !fileIO.exists(managedTbLocation), + "Managed table data should not exist after drop") + } + } + } + + test("PaimonExternalTable: partitioned external table") { + withTempDir { + tbLocation => + withTable("external_tbl") { + val externalTbLocation = tbLocation.getCanonicalPath + + // For REST catalog external tables, schema must be created in filesystem first + val schemaTablePath = new Path(externalTbLocation) + val schemaFileIO = LocalFileIO.create() + val schema = Schema + .newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("value", DataTypes.DOUBLE()) + .column("dept", DataTypes.STRING()) + .partitionKeys("dept") + .option("path", externalTbLocation) + .option("type", "table") + .build() + new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, true) + + sql(s""" + |CREATE TABLE external_tbl (id INT, name STRING, value DOUBLE) USING paimon + |PARTITIONED BY (dept STRING) + |LOCATION '$externalTbLocation' + |""".stripMargin) + + sql( + "INSERT INTO external_tbl VALUES " + + "(1, 'Alice', 10.5, 'Engineering')," + + "(2, 'Bob', 20.7, 'Engineering')," + + "(3, 'Charlie', 30.9, 'Sales')," + + "(4, 'David', 25.3, 'Sales')") + + // Test reading all data + checkAnswer( + sql("SELECT * FROM external_tbl ORDER BY id"), + Seq( + Row(1, "Alice", 10.5, "Engineering"), + Row(2, "Bob", 20.7, "Engineering"), + Row(3, "Charlie", 30.9, "Sales"), + Row(4, "David", 25.3, "Sales") + ) + ) + + // Test partition filtering + checkAnswer( + sql("SELECT * FROM external_tbl WHERE dept = 'Engineering' ORDER BY id"), + Seq( + Row(1, "Alice", 10.5, "Engineering"), + Row(2, "Bob", 20.7, "Engineering") + ) + ) + + // Test column projection with partition filtering + checkAnswer( + sql("SELECT name, value FROM external_tbl WHERE dept = 'Sales' ORDER BY id"), + Seq( + Row("Charlie", 30.9), + Row("David", 25.3) + ) + ) + + // Verify this is an external table - drop and check data exists + val table = paimonCatalog + .getTable(Identifier.create("test_db", "external_tbl")) + .asInstanceOf[FileStoreTable] + val fileIO = table.fileIO() + val actualTbLocation = table.location() + + sql("DROP TABLE external_tbl") + assert(fileIO.exists(actualTbLocation), "External table data should exist after drop") + } + } + } + + test("PaimonExternalTable: rename external table") { + withTempDir { + tbLocation => + withTable("external_tbl", "external_tbl_renamed") { + val externalTbLocation = tbLocation.getCanonicalPath + + // For REST catalog external tables, schema must be created in filesystem first + val schemaTablePath = new Path(externalTbLocation) + val schemaFileIO = LocalFileIO.create() + val schema = Schema + .newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .option("path", externalTbLocation) + .option("type", "table") + .build() + new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, true) + + // create external table + sql( + s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon LOCATION '$externalTbLocation'") + sql("INSERT INTO external_tbl VALUES (1, 'Alice')") + val originalLocation = paimonCatalog + .getTable(Identifier.create("test_db", "external_tbl")) + .asInstanceOf[FileStoreTable] + .location() + + // rename external table, location should not change + sql("ALTER TABLE external_tbl RENAME TO external_tbl_renamed") + checkAnswer( + sql("SELECT * FROM external_tbl_renamed"), + Seq(Row(1, "Alice")) + ) + + val renamedTable = paimonCatalog + .getTable(Identifier.create("test_db", "external_tbl_renamed")) + .asInstanceOf[FileStoreTable] + val renamedLocation = renamedTable.location() + assert( + renamedLocation.toString.equals(originalLocation.toString), + "External table location should not change after rename" + ) + } + } + } + + test("PaimonExternalTable: create external table without schema") { + withTempDir { + tbLocation => + withTable("t1", "t2") { + val externalTbLocation = tbLocation.getCanonicalPath + + // For REST catalog external tables, schema must be created in filesystem first + val schemaTablePath = new Path(externalTbLocation) + val schemaFileIO = LocalFileIO.create() + val schema = Schema + .newBuilder() + .column("id", DataTypes.INT()) + .column("pt", DataTypes.INT()) + .partitionKeys("pt") + .primaryKey("id") + .option("path", externalTbLocation) + .option("type", "table") + .build() + new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, true) + + // First create a table with schema and data + sql(s""" + |CREATE TABLE t1 (id INT, pt INT) USING paimon + |PARTITIONED BY (pt) + |TBLPROPERTIES('primary-key' = 'id') + |LOCATION '$externalTbLocation' + |""".stripMargin) + sql("INSERT INTO t1 VALUES (1, 1), (2, 2)") + + // create external table without schema - should infer from existing table + sql(s"CREATE TABLE t2 USING paimon LOCATION '$externalTbLocation'") + checkAnswer( + sql("SELECT * FROM t2 ORDER BY id"), + Seq(Row(1, 1), Row(2, 2)) + ) + + val table2 = + paimonCatalog.getTable(Identifier.create("test_db", "t2")).asInstanceOf[FileStoreTable] + val table2Location = table2.location() + // Verify table2 can access the data from table1's location + assert(table2Location != null, "Table t2 should have a location") + // The key point is that t2 can read data from the same location as t1 + assert(table2.fileIO().exists(table2Location), "Table t2 location should exist") + } + } + } + + test("PaimonExternalTable: create external table on managed table location") { + withTable("external_tbl", "managed_tbl") { + // Create managed table first + sql("CREATE TABLE managed_tbl (id INT, name STRING) USING paimon") + sql("INSERT INTO managed_tbl VALUES (1, 'Alice'), (2, 'Bob')") + checkAnswer( + sql("SELECT * FROM managed_tbl ORDER BY id"), + Seq(Row(1, "Alice"), Row(2, "Bob")) + ) + + val managedTable = paimonCatalog + .getTable(Identifier.create("test_db", "managed_tbl")) + .asInstanceOf[FileStoreTable] + val managedLocation = managedTable.location() + // Extract actual file system path, removing any scheme prefix (e.g., "file:", "rest:") + val tablePath = if (managedLocation.toString.contains(":")) { + // Path has scheme, extract the path part after the first ":" + val parts = managedLocation.toString.split(":", 2) + if (parts.length == 2 && parts(0).equals("file")) { + parts(1) // For file: scheme, use the path directly + } else { + // For other schemes or if parsing fails, try to get canonical path + try { + new java.io.File(managedLocation.toString.replaceFirst("^[^:]+:", "")).getCanonicalPath + } catch { + case _: Exception => managedLocation.toString.replaceFirst("^[^:]+:", "") + } + } + } else { + managedLocation.toString + } + + // For REST catalog, managed table already has schema, no need to create schema again + // Create external table pointing to managed table location + sql(s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon LOCATION '$tablePath'") + checkAnswer( + sql("SELECT * FROM external_tbl ORDER BY id"), + Seq(Row(1, "Alice"), Row(2, "Bob")) + ) + + val externalTable = paimonCatalog + .getTable(Identifier.create("test_db", "external_tbl")) + .asInstanceOf[FileStoreTable] + assert( + StringUtils.replace(externalTable.location().toString, "file:", "").equals(tablePath), + "External table should point to managed table location" + ) + + // Drop managed table - managed table deletion will delete data files + // since external table points to the same location, data will be deleted + sql("DROP TABLE managed_tbl") + val fileIO = externalTable.fileIO() + assert( + !fileIO.exists(externalTable.location()), + "Data should be deleted after dropping managed table since external table points to managed table location" + ) + + // External table cannot read data anymore since data was deleted with managed table + // This demonstrates that external table pointing to managed table location shares the same data + checkAnswer( + sql("SELECT * FROM external_tbl ORDER BY id"), + Seq.empty + ) + } + } + + test("PaimonExternalTable: insert overwrite on external table") { + withTempDir { + tbLocation => + withTable("external_tbl") { + val externalTbLocation = tbLocation.getCanonicalPath + + // For REST catalog external tables, schema must be created in filesystem first + val schemaTablePath = new Path(externalTbLocation) + val schemaFileIO = LocalFileIO.create() + val schema = Schema + .newBuilder() + .column("age", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .option("path", externalTbLocation) + .option("type", "table") + .build() + new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, true) + + sql( + s"CREATE TABLE external_tbl (age INT, name STRING) USING paimon LOCATION '$externalTbLocation'") + + sql("INSERT INTO external_tbl VALUES (5, 'Ben'), (7, 'Larry')") + checkAnswer( + sql("SELECT age, name FROM external_tbl ORDER BY age"), + Seq(Row(5, "Ben"), Row(7, "Larry")) + ) + + sql("INSERT OVERWRITE external_tbl VALUES (5, 'Jerry'), (7, 'Tom')") + checkAnswer( + sql("SELECT age, name FROM external_tbl ORDER BY age"), + Seq(Row(5, "Jerry"), Row(7, "Tom")) + ) + } + } + } + + test("PaimonExternalTable: insert overwrite on partitioned external table") { + withTempDir { + tbLocation => + withTable("external_tbl") { + val externalTbLocation = tbLocation.getCanonicalPath + + // For REST catalog external tables, schema must be created in filesystem first + val schemaTablePath = new Path(externalTbLocation) + val schemaFileIO = LocalFileIO.create() + val schema = Schema + .newBuilder() + .column("age", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("id", DataTypes.INT()) + .partitionKeys("id") + .option("path", externalTbLocation) + .option("type", "table") + .build() + new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, true) + + sql(s""" + |CREATE TABLE external_tbl (age INT, name STRING) USING paimon + |PARTITIONED BY (id INT) + |LOCATION '$externalTbLocation' + |""".stripMargin) + + sql("INSERT INTO external_tbl PARTITION (id = 1) VALUES (5, 'Ben'), (7, 'Larry')") + sql("INSERT OVERWRITE external_tbl PARTITION (id = 1) VALUES (5, 'Jerry'), (7, 'Tom')") + checkAnswer( + sql("SELECT id, age, name FROM external_tbl ORDER BY id, age"), + Seq(Row(1, 5, "Jerry"), Row(1, 7, "Tom")) + ) + + sql("INSERT INTO external_tbl PARTITION (id = 3) VALUES (5, 'Alice')") + // Use dynamic partition overwrite mode to only overwrite partitions present in data + withSparkSQLConf("spark.sql.sources.partitionOverwriteMode" -> "dynamic") { + sql("INSERT OVERWRITE external_tbl VALUES (5, 'Jerry', 1), (7, 'Tom', 2)") + } + checkAnswer( + sql("SELECT id, age, name FROM external_tbl ORDER BY id, age"), + Seq(Row(1, 5, "Jerry"), Row(2, 7, "Tom"), Row(3, 5, "Alice")) + ) + } + } + } + + test("PaimonExternalTable: show partitions on external table") { + withTempDir { + tbLocation => + withTable("external_tbl") { + val externalTbLocation = tbLocation.getCanonicalPath + + // For REST catalog external tables, schema must be created in filesystem first + val schemaTablePath = new Path(externalTbLocation) + val schemaFileIO = LocalFileIO.create() + val schema = Schema + .newBuilder() + .column("id", DataTypes.INT()) + .column("p1", DataTypes.INT()) + .column("p2", DataTypes.STRING()) + .partitionKeys("p1", "p2") + .option("path", externalTbLocation) + .option("type", "table") + .build() + new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, true) + + sql(s""" + |CREATE TABLE external_tbl (id INT, p1 INT, p2 STRING) USING paimon + |PARTITIONED BY (p1, p2) + |LOCATION '$externalTbLocation' + |""".stripMargin) + + sql("INSERT INTO external_tbl VALUES (1, 1, '1')") + sql("INSERT INTO external_tbl VALUES (2, 1, '1')") + sql("INSERT INTO external_tbl VALUES (3, 2, '1')") + sql("INSERT INTO external_tbl VALUES (3, 2, '2')") + + checkAnswer( + sql("SHOW PARTITIONS external_tbl"), + Seq(Row("p1=1/p2=1"), Row("p1=2/p2=1"), Row("p1=2/p2=2"))) + checkAnswer( + sql("SHOW PARTITIONS external_tbl PARTITION (p1=2)"), + Seq(Row("p1=2/p2=1"), Row("p1=2/p2=2"))) + checkAnswer( + sql("SHOW PARTITIONS external_tbl PARTITION (p1=2, p2='2')"), + Seq(Row("p1=2/p2=2"))) + } + } + } +}