diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index 7ff5b31c99bf..871b1a657fdb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -206,32 +206,35 @@ Table buildPaimonTable(DynamicTableFactory.Context context) { // dynamic options should override origin options newOptions.putAll(dynamicOptions); - FileStoreTable fileStoreTable; - if (origin instanceof DataCatalogTable) { - fileStoreTable = (FileStoreTable) ((DataCatalogTable) origin).table(); - } else if (flinkCatalog == null) { - // In case Paimon is directly used as a Flink connector, instead of through catalog. - fileStoreTable = FileStoreTableFactory.create(createCatalogContext(context)); + Table table; + if (origin instanceof FormatCatalogTable) { + table = ((FormatCatalogTable) origin).table(); } else { - // In cases like materialized table, the Paimon table might not be DataCatalogTable, - // but can still be acquired through the catalog. - Identifier identifier = - Identifier.create( - context.getObjectIdentifier().getDatabaseName(), - context.getObjectIdentifier().getObjectName()); - try { - fileStoreTable = (FileStoreTable) flinkCatalog.catalog().getTable(identifier); - } catch (Catalog.TableNotExistException e) { - throw new RuntimeException(e); + FileStoreTable fileStoreTable; + if (origin instanceof DataCatalogTable) { + fileStoreTable = (FileStoreTable) ((DataCatalogTable) origin).table(); + } else if (flinkCatalog == null) { + // In case Paimon is directly used as a Flink connector, instead of through catalog. + fileStoreTable = FileStoreTableFactory.create(createCatalogContext(context)); + } else { + // In cases like materialized table, the Paimon table might not be DataCatalogTable, + // but can still be acquired through the catalog. + Identifier identifier = + Identifier.create( + context.getObjectIdentifier().getDatabaseName(), + context.getObjectIdentifier().getObjectName()); + try { + fileStoreTable = (FileStoreTable) flinkCatalog.catalog().getTable(identifier); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } } + table = fileStoreTable.copyWithoutTimeTravel(newOptions); } - FileStoreTable table = fileStoreTable.copyWithoutTimeTravel(newOptions); - if (Options.fromMap(table.options()).get(FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED)) { Map runtimeContext = getAllOptions(context); table.fileIO().setRuntimeContext(runtimeContext); } - // notice that the Paimon table schema must be the same with the Flink's Schema schema = FlinkCatalog.fromCatalogTable(context.getCatalogTable()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java index d5c1ed043b56..bd24ef7dac0d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java @@ -52,10 +52,6 @@ public String factoryIdentifier() { @Override public DynamicTableSource createDynamicTableSource(Context context) { - CatalogTable table = context.getCatalogTable().getOrigin(); - if (table instanceof FormatCatalogTable) { - return ((FormatCatalogTable) table).createTableSource(context); - } createTableIfNeeded(context); return super.createDynamicTableSource(context); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java index 34df2f566d36..7ee09b27ec1a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java @@ -23,7 +23,6 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.logical.RowType; @@ -128,17 +127,6 @@ public Optional getDetailedDescription() { return getDescription(); } - public DynamicTableSource createTableSource(DynamicTableFactory.Context context) { - return FactoryUtil.createDynamicTableSource( - null, - context.getObjectIdentifier(), - context.getCatalogTable(), - new HashMap<>(), - context.getConfiguration(), - context.getClassLoader(), - context.isTemporary()); - } - public DynamicTableSink createTableSink(DynamicTableFactory.Context context) { return FactoryUtil.createDynamicTableSink( null, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java index 7da91520dd34..adfa219a6583 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java @@ -50,7 +50,7 @@ public abstract class RESTCatalogITCaseBase extends CatalogITCaseBase { protected RESTCatalogServer restCatalogServer; private String serverUrl; - private String dataPath; + protected String dataPath; protected String warehouse; @TempDir java.nio.file.Path tempFile; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java new file mode 100644 index 000000000000..2cf159cfb385 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.RESTCatalogITCaseBase; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SupportsDirectWrite; +import org.apache.paimon.format.parquet.ParquetFileFormatFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.ResolvingFileIO; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.rest.RESTToken; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for format table. */ +public class FormatTableReadITCaseBase extends RESTCatalogITCaseBase { + + @Test + public void testParquetFileFormat() throws IOException { + FileFormatFactory formatFactory = new ParquetFileFormatFactory(); + RowType rowType = + RowType.builder() + .field("a", DataTypes.INT()) + .field("b", DataTypes.INT()) + .field("c", DataTypes.INT()) + .build(); + FormatWriterFactory factory = + (formatFactory.create( + new FileFormatFactory.FormatContext(new Options(), 1024, 1024))) + .createWriterFactory(rowType); + InternalRow[] datas = new InternalRow[2]; + datas[0] = GenericRow.of(1, 1, 1); + datas[1] = GenericRow.of(2, 2, 2); + String tableName = "format_table_test"; + sql( + "CREATE TABLE %s (a INT, b INT, c INT) WITH ('file.format'='parquet', 'type'='format-table')", + tableName); + write( + factory, + new Path(dataPath, String.format("default.db/%s/data-1.parquet", tableName)), + datas); + RESTToken expiredDataToken = + new RESTToken( + ImmutableMap.of( + "akId", "akId-expire", "akSecret", UUID.randomUUID().toString()), + System.currentTimeMillis() + 1000_000); + Identifier identifier = Identifier.create("default", tableName); + restCatalogServer.setDataToken(identifier, expiredDataToken); + assertThat(sql("SELECT a FROM %s", tableName)) + .containsExactlyInAnyOrder(Row.of(1), Row.of(2)); + sql("Drop TABLE %s", tableName); + } + + protected void write(FormatWriterFactory factory, Path file, InternalRow... rows) + throws IOException { + FileIO fileIO = new ResolvingFileIO(); + Options catalogOptions = new Options(); + catalogOptions.set(CatalogOptions.WAREHOUSE, dataPath); + CatalogContext catalogContext = CatalogContext.create(catalogOptions); + fileIO.configure(catalogContext); + FormatWriter writer; + PositionOutputStream out = null; + if (factory instanceof SupportsDirectWrite) { + writer = ((SupportsDirectWrite) factory).create(fileIO, file, "zstd"); + } else { + out = fileIO.newOutputStream(file, true); + writer = factory.create(out, "zstd"); + } + for (InternalRow row : rows) { + writer.addElement(row); + } + writer.close(); + if (out != null) { + out.close(); + } + } +}