From ac71f9271eae5ec8ccc96477ab373d0c6beaec35 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 23 Dec 2025 12:01:09 +0800 Subject: [PATCH] [feat](Catalog)Support Paimon DLF Catalog Using OSSHDFS Storage (#59245) Support Paimon DLF Catalog Using OSSHDFS Storage (#59245) --- .../PaimonAliyunDLFMetaStoreProperties.java | 11 +--- ...aimonAliyunDLFMetaStorePropertiesTest.java | 58 +++++++++++++++++++ .../oss_hdfs_catalog_test.groovy | 14 +++++ 3 files changed, 75 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java index a3e6c9dd85a189..9bc77d543d3d59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java @@ -18,7 +18,6 @@ package org.apache.doris.datasource.property.metastore; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; -import org.apache.doris.datasource.property.storage.OSSProperties; import org.apache.doris.datasource.property.storage.StorageProperties; import com.aliyun.datalake.metastore.common.DataLakeConfig; @@ -89,15 +88,11 @@ public Catalog initializeCatalog(String catalogName, List sto HiveConf hiveConf = buildHiveConf(); buildCatalogOptions(); StorageProperties ossProps = storagePropertiesList.stream() - .filter(sp -> sp.getType() == StorageProperties.Type.OSS) + .filter(sp -> sp.getType() == StorageProperties.Type.OSS + || sp.getType() == StorageProperties.Type.OSS_HDFS) .findFirst() .orElseThrow(() -> new IllegalStateException("Paimon DLF metastore requires OSS storage properties.")); - - if (!(ossProps instanceof OSSProperties)) { - throw new IllegalStateException("Expected OSSProperties type."); - } - OSSProperties ossProperties = (OSSProperties) ossProps; - hiveConf.addResource(ossProperties.getHadoopStorageConfig()); + ossProps.getHadoopStorageConfig().forEach(entry -> hiveConf.set(entry.getKey(), entry.getValue())); appendUserHadoopConfig(hiveConf); CatalogContext catalogContext = CatalogContext.create(catalogOptions, hiveConf); return CatalogFactory.createCatalog(catalogContext); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStorePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStorePropertiesTest.java index f2eade7239c98f..1e02de6a5a43a5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStorePropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStorePropertiesTest.java @@ -100,6 +100,64 @@ void testInitializeCatalogWithValidOssProperties() throws UserException { } } + + @Test + void testInitializeCatalogWithValidOssHdfsProperties() throws UserException { + Map props = createValidProps(); + PaimonAliyunDLFMetaStoreProperties dlfProps = + new PaimonAliyunDLFMetaStoreProperties(props); + dlfProps.initNormalizeAndCheckProps(); + + // Prepare OSSProperties mock + Map ossProps = new HashMap<>(); + ossProps.put("dlf.access_key", "ak"); + ossProps.put("dlf.secret_key", "sk"); + ossProps.put("dlf.endpoint", "dlf-vpc.cn-beijing.aliyuncs.com"); + ossProps.put("dlf.region", "cn-beijing"); + ossProps.put("oss.hdfs.enabled", "true"); + + + List storageProperties = StorageProperties.createAll(ossProps); + + Catalog mockCatalog = Mockito.mock(Catalog.class); + + try (MockedStatic mocked = Mockito.mockStatic(CatalogFactory.class)) { + mocked.when(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class))) + .thenReturn(mockCatalog); + + Catalog catalog = dlfProps.initializeCatalog("testCatalog", storageProperties); + + Assertions.assertNotNull(catalog, "Catalog should not be null"); + Assertions.assertEquals(mockCatalog, catalog, "Catalog should be the mocked one"); + + mocked.verify(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class))); + } + ossProps = new HashMap<>(); + ossProps.put("dlf.access_key", "ak"); + ossProps.put("dlf.secret_key", "sk"); + ossProps.put("dlf.endpoint", "dlf-vpc.cn-beijing.aliyuncs.com"); + ossProps.put("dlf.region", "cn-beijing"); + ossProps.put("oss.access_key", "ak"); + ossProps.put("oss.secret_key", "sk"); + ossProps.put("oss.endpoint", "oss-cn-beijing.oss-dls.aliyuncs.com"); + storageProperties = StorageProperties.createAll(ossProps); + + mockCatalog = Mockito.mock(Catalog.class); + + try (MockedStatic mocked = Mockito.mockStatic(CatalogFactory.class)) { + mocked.when(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class))) + .thenReturn(mockCatalog); + + Catalog catalog = dlfProps.initializeCatalog("testCatalog", storageProperties); + + Assertions.assertNotNull(catalog, "Catalog should not be null"); + Assertions.assertEquals(mockCatalog, catalog, "Catalog should be the mocked one"); + + mocked.verify(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class))); + } + + } + @Test void testInitializeCatalogWithoutOssPropertiesThrows() { Map props = createValidProps(); diff --git a/regression-test/suites/external_table_p2/refactor_catalog_param/oss_hdfs_catalog_test.groovy b/regression-test/suites/external_table_p2/refactor_catalog_param/oss_hdfs_catalog_test.groovy index 13efa891a2368a..9f589ab1e2d138 100644 --- a/regression-test/suites/external_table_p2/refactor_catalog_param/oss_hdfs_catalog_test.groovy +++ b/regression-test/suites/external_table_p2/refactor_catalog_param/oss_hdfs_catalog_test.groovy @@ -144,7 +144,20 @@ suite("oss_hdfs_catalog_test", "p2,external,new_catalog_property") { 'oss.hdfs.endpoint' = '${oss_hdfs_endpoint}', 'oss.hdfs.region'='${oss_hdfs_region}' """ + //**************** Paimon DLF ON OSS_HDFS *******************/ + String query_table_paimon_dlf = context.config.otherConfigs.get("paimonDlfWarehouseOnOssHdfsQueryTable") + String query_count_paimon_dlf = context.config.otherConfigs.get("paimonDlfWarehouseOnOssHdfsQueryCount") + String paimon_dlf_old_catalog_properties = context.config.otherConfigs.get("paimonDlfOnOssHdfsCatalogOldProperties") + String paimon_dlf_new_catalog_properties1 = context.config.otherConfigs.get("paimonDlfOnOssHdfsCatalogNewProperties1") + String paimon_dlf_new_catalog_properties2 = context.config.otherConfigs.get("paimonDlfOnOssHdfsCatalogNewProperties2") + + testQuery("paimon_dlf_oss_hdfs_old_catalog",paimon_dlf_old_catalog_properties ,query_table_paimon_dlf,query_count_paimon_dlf,true) + testQuery("paimon_dlf_oss_hdfs_old_catalog",paimon_dlf_old_catalog_properties ,query_table_paimon_dlf,query_count_paimon_dlf,false) + testQuery("paimon_dlf_oss_hdfs_new_catalog1",paimon_dlf_new_catalog_properties1 ,query_table_paimon_dlf,query_count_paimon_dlf,true) + testQuery("paimon_dlf_oss_hdfs_new_catalog1",paimon_dlf_new_catalog_properties1 ,query_table_paimon_dlf,query_count_paimon_dlf,false) + testQuery("paimon_dlf_oss_hdfs_new_catalog2",paimon_dlf_new_catalog_properties2 ,query_table_paimon_dlf,query_count_paimon_dlf,true) + testQuery("paimon_dlf_oss_hdfs_new_catalog2",paimon_dlf_new_catalog_properties2 ,query_table_paimon_dlf,query_count_paimon_dlf,false) //**************** Paimon FILESYSTEM ON OSS_HDFS *******************/ String paimon_fs_warehouse = context.config.otherConfigs.get("paimonFsWarehouseOnOssHdfs") @@ -161,6 +174,7 @@ suite("oss_hdfs_catalog_test", "p2,external,new_catalog_property") { testQuery("paimon_fs_oss_hdfs_region_catalog",paimon_file_system_catalog_properties + usingOSSHDFSProps + old_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,false) testQuery("paimon_fs_oss_hdfs_new_catalog",paimon_file_system_catalog_properties + new_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,true) testQuery("paimon_fs_oss_hdfs_new_catalog",paimon_file_system_catalog_properties + new_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,false) + //**************** ICEBERG FILESYSTEM ON OSS_HDFS *******************/ String iceberg_file_system_catalog_properties = """ 'type'='iceberg',