Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.datasource.property.metastore;

import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.property.storage.OSSProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;

import com.aliyun.datalake.metastore.common.DataLakeConfig;
Expand Down Expand Up @@ -89,15 +88,11 @@ public Catalog initializeCatalog(String catalogName, List<StorageProperties> sto
HiveConf hiveConf = buildHiveConf();
buildCatalogOptions();
StorageProperties ossProps = storagePropertiesList.stream()
.filter(sp -> sp.getType() == StorageProperties.Type.OSS)
.filter(sp -> sp.getType() == StorageProperties.Type.OSS
|| sp.getType() == StorageProperties.Type.OSS_HDFS)
.findFirst()
.orElseThrow(() -> new IllegalStateException("Paimon DLF metastore requires OSS storage properties."));

if (!(ossProps instanceof OSSProperties)) {
throw new IllegalStateException("Expected OSSProperties type.");
}
OSSProperties ossProperties = (OSSProperties) ossProps;
hiveConf.addResource(ossProperties.getHadoopStorageConfig());
ossProps.getHadoopStorageConfig().forEach(entry -> hiveConf.set(entry.getKey(), entry.getValue()));
appendUserHadoopConfig(hiveConf);
CatalogContext catalogContext = CatalogContext.create(catalogOptions, hiveConf);
return CatalogFactory.createCatalog(catalogContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,64 @@ void testInitializeCatalogWithValidOssProperties() throws UserException {
}
}


@Test
void testInitializeCatalogWithValidOssHdfsProperties() throws UserException {
Map<String, String> props = createValidProps();
PaimonAliyunDLFMetaStoreProperties dlfProps =
new PaimonAliyunDLFMetaStoreProperties(props);
dlfProps.initNormalizeAndCheckProps();

// Prepare OSSProperties mock
Map<String, String> ossProps = new HashMap<>();
ossProps.put("dlf.access_key", "ak");
ossProps.put("dlf.secret_key", "sk");
ossProps.put("dlf.endpoint", "dlf-vpc.cn-beijing.aliyuncs.com");
ossProps.put("dlf.region", "cn-beijing");
ossProps.put("oss.hdfs.enabled", "true");


List<StorageProperties> storageProperties = StorageProperties.createAll(ossProps);

Catalog mockCatalog = Mockito.mock(Catalog.class);

try (MockedStatic<CatalogFactory> mocked = Mockito.mockStatic(CatalogFactory.class)) {
mocked.when(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class)))
.thenReturn(mockCatalog);

Catalog catalog = dlfProps.initializeCatalog("testCatalog", storageProperties);

Assertions.assertNotNull(catalog, "Catalog should not be null");
Assertions.assertEquals(mockCatalog, catalog, "Catalog should be the mocked one");

mocked.verify(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class)));
}
ossProps = new HashMap<>();
ossProps.put("dlf.access_key", "ak");
ossProps.put("dlf.secret_key", "sk");
ossProps.put("dlf.endpoint", "dlf-vpc.cn-beijing.aliyuncs.com");
ossProps.put("dlf.region", "cn-beijing");
ossProps.put("oss.access_key", "ak");
ossProps.put("oss.secret_key", "sk");
ossProps.put("oss.endpoint", "oss-cn-beijing.oss-dls.aliyuncs.com");
storageProperties = StorageProperties.createAll(ossProps);

mockCatalog = Mockito.mock(Catalog.class);

try (MockedStatic<CatalogFactory> mocked = Mockito.mockStatic(CatalogFactory.class)) {
mocked.when(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class)))
.thenReturn(mockCatalog);

Catalog catalog = dlfProps.initializeCatalog("testCatalog", storageProperties);

Assertions.assertNotNull(catalog, "Catalog should not be null");
Assertions.assertEquals(mockCatalog, catalog, "Catalog should be the mocked one");

mocked.verify(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class)));
}

}

@Test
void testInitializeCatalogWithoutOssPropertiesThrows() {
Map<String, String> props = createValidProps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,20 @@ suite("oss_hdfs_catalog_test", "p2,external,new_catalog_property") {
'oss.hdfs.endpoint' = '${oss_hdfs_endpoint}',
'oss.hdfs.region'='${oss_hdfs_region}'
"""
//**************** Paimon DLF ON OSS_HDFS *******************/

String query_table_paimon_dlf = context.config.otherConfigs.get("paimonDlfWarehouseOnOssHdfsQueryTable")
String query_count_paimon_dlf = context.config.otherConfigs.get("paimonDlfWarehouseOnOssHdfsQueryCount")
String paimon_dlf_old_catalog_properties = context.config.otherConfigs.get("paimonDlfOnOssHdfsCatalogOldProperties")
String paimon_dlf_new_catalog_properties1 = context.config.otherConfigs.get("paimonDlfOnOssHdfsCatalogNewProperties1")
String paimon_dlf_new_catalog_properties2 = context.config.otherConfigs.get("paimonDlfOnOssHdfsCatalogNewProperties2")

testQuery("paimon_dlf_oss_hdfs_old_catalog",paimon_dlf_old_catalog_properties ,query_table_paimon_dlf,query_count_paimon_dlf,true)
testQuery("paimon_dlf_oss_hdfs_old_catalog",paimon_dlf_old_catalog_properties ,query_table_paimon_dlf,query_count_paimon_dlf,false)
testQuery("paimon_dlf_oss_hdfs_new_catalog1",paimon_dlf_new_catalog_properties1 ,query_table_paimon_dlf,query_count_paimon_dlf,true)
testQuery("paimon_dlf_oss_hdfs_new_catalog1",paimon_dlf_new_catalog_properties1 ,query_table_paimon_dlf,query_count_paimon_dlf,false)
testQuery("paimon_dlf_oss_hdfs_new_catalog2",paimon_dlf_new_catalog_properties2 ,query_table_paimon_dlf,query_count_paimon_dlf,true)
testQuery("paimon_dlf_oss_hdfs_new_catalog2",paimon_dlf_new_catalog_properties2 ,query_table_paimon_dlf,query_count_paimon_dlf,false)

//**************** Paimon FILESYSTEM ON OSS_HDFS *******************/
String paimon_fs_warehouse = context.config.otherConfigs.get("paimonFsWarehouseOnOssHdfs")
Expand All @@ -161,6 +174,7 @@ suite("oss_hdfs_catalog_test", "p2,external,new_catalog_property") {
testQuery("paimon_fs_oss_hdfs_region_catalog",paimon_file_system_catalog_properties + usingOSSHDFSProps + old_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,false)
testQuery("paimon_fs_oss_hdfs_new_catalog",paimon_file_system_catalog_properties + new_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,true)
testQuery("paimon_fs_oss_hdfs_new_catalog",paimon_file_system_catalog_properties + new_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,false)

//**************** ICEBERG FILESYSTEM ON OSS_HDFS *******************/
String iceberg_file_system_catalog_properties = """
'type'='iceberg',
Expand Down
Loading