Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions docs/en/docs/lakehouse/multi-catalog/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,30 @@ CREATE CATALOG iceberg PROPERTIES (

### Create Catalog based on Iceberg API

Use the Iceberg API to access metadata, and support services such as Hive, REST, DLF and Glue as Iceberg's Catalog.
Use the Iceberg API to access metadata, and support services such as Hadoop File System, Hive, REST, DLF and Glue as Iceberg's Catalog.

#### Hadoop Catalog

```sql
CREATE CATALOG iceberg_hadoop PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type' = 'hadoop',
'warehouse' = 'hdfs://your-host:8020/dir/key'
);
```

```sql
CREATE CATALOG iceberg_hadoop_ha PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type' = 'hadoop',
'warehouse' = 'hdfs://your-nameservice/dir/key',
'dfs.nameservices'='your-nameservice',
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);
```

#### Hive Metastore

Expand Down Expand Up @@ -133,16 +156,42 @@ CREATE CATALOG iceberg PROPERTIES (

`hive.metastore.uris`: Dataproc Metastore URI,See in Metastore Services :[Dataproc Metastore Services](https://console.cloud.google.com/dataproc/metastore).

### Iceberg On S3
### Iceberg On Object Storage

If the data is stored on S3, the following parameters can be used in properties:

```
"s3.access_key" = "ak"
"s3.secret_key" = "sk"
"s3.endpoint" = "http://endpoint-uri"
"s3.region" = "your-region"
"s3.credentials.provider" = "provider-class-name" // 可选,默认凭证类基于BasicAWSCredentials实现。
"s3.endpoint" = "s3.us-east-1.amazonaws.com"
"s3.region" = "us-east-1"
```

The data is stored on Alibaba Cloud OSS:

```
"oss.access_key" = "ak"
"oss.secret_key" = "sk"
"oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
"oss.region" = "oss-cn-beijing"
```

The data is stored on Tencent Cloud COS:

```
"cos.access_key" = "ak"
"cos.secret_key" = "sk"
"cos.endpoint" = "cos.ap-beijing.myqcloud.com"
"cos.region" = "ap-beijing"
```

The data is stored on Huawei Cloud OBS:

```
"obs.access_key" = "ak"
"obs.secret_key" = "sk"
"obs.endpoint" = "obs.cn-north-4.myhuaweicloud.com"
"obs.region" = "cn-north-4"
```

## Column type mapping
Expand Down
61 changes: 55 additions & 6 deletions docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,30 @@ CREATE CATALOG iceberg PROPERTIES (

### 基于Iceberg API创建Catalog

使用Iceberg API访问元数据的方式,支持Hive、REST、Glue、DLF等服务作为Iceberg的Catalog。
使用Iceberg API访问元数据的方式,支持Hadoop File System、Hive、REST、Glue、DLF等服务作为Iceberg的Catalog。

#### Hadoop Catalog

```sql
CREATE CATALOG iceberg_hadoop PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type' = 'hadoop',
'warehouse' = 'hdfs://your-host:8020/dir/key'
);
```

```sql
CREATE CATALOG iceberg_hadoop_ha PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type' = 'hadoop',
'warehouse' = 'hdfs://your-nameservice/dir/key',
'dfs.nameservices'='your-nameservice',
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);
```

#### Hive Metastore

Expand Down Expand Up @@ -133,16 +156,42 @@ CREATE CATALOG iceberg PROPERTIES (

`hive.metastore.uris`: Dataproc Metastore 服务开放的接口,在 Metastore 管理页面获取 :[Dataproc Metastore Services](https://console.cloud.google.com/dataproc/metastore).

### Iceberg On S3
### Iceberg On Object Storage

若数据存放在S3上,properties中可以使用以下参数
若数据存放在S3上,properties中可以使用以下参数

```
"s3.access_key" = "ak"
"s3.secret_key" = "sk"
"s3.endpoint" = "http://endpoint-uri"
"s3.region" = "your-region"
"s3.credentials.provider" = "provider-class-name" // 可选,默认凭证类基于BasicAWSCredentials实现。
"s3.endpoint" = "s3.us-east-1.amazonaws.com"
"s3.region" = "us-east-1"
```

数据存放在阿里云OSS上:

```
"oss.access_key" = "ak"
"oss.secret_key" = "sk"
"oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
"oss.region" = "oss-cn-beijing"
```

数据存放在腾讯云COS上:

```
"cos.access_key" = "ak"
"cos.secret_key" = "sk"
"cos.endpoint" = "cos.ap-beijing.myqcloud.com"
"cos.region" = "ap-beijing"
```

数据存放在华为云OBS上:

```
"obs.access_key" = "ak"
"obs.secret_key" = "sk"
"obs.endpoint" = "obs.cn-north-4.myhuaweicloud.com"
"obs.region" = "cn-north-4"
```

## 列类型映射
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class HdfsResource extends Resource {
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static String DSF_NAMESERVICES = "dfs.nameservices";
public static final String HDFS_PREFIX = "hdfs:";
public static final String HDFS_FILE_PREFIX = "hdfs://";

@SerializedName(value = "properties")
private Map<String, String> properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
public static final String ICEBERG_REST = "rest";
public static final String ICEBERG_HMS = "hms";
public static final String ICEBERG_HADOOP = "hadoop";
public static final String ICEBERG_GLUE = "glue";
public static final String ICEBERG_DLF = "dlf";
protected String icebergCatalogType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public static ExternalCatalog createCatalog(long catalogId, String name, String
return new IcebergGlueExternalCatalog(catalogId, name, resource, props, comment);
case IcebergExternalCatalog.ICEBERG_DLF:
return new IcebergDLFExternalCatalog(catalogId, name, resource, props, comment);
case IcebergExternalCatalog.ICEBERG_HADOOP:
return new IcebergHadoopExternalCatalog(catalogId, name, resource, props, comment);
default:
throw new DdlException("Unknown " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE
+ " value: " + catalogType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg;

import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.hadoop.HadoopCatalog;

import java.util.HashMap;
import java.util.Map;

public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog {

public IcebergHadoopExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment) {
super(catalogId, name, comment);
props = PropertyConverter.convertToMetaProperties(props);
String warehouse = props.get(CatalogProperties.WAREHOUSE_LOCATION);
Preconditions.checkArgument(StringUtils.isNotEmpty(warehouse),
"Cannot initialize Iceberg HadoopCatalog because 'warehouse' must not be null or empty");
String nameService = StringUtils.substringBetween(warehouse, HdfsResource.HDFS_FILE_PREFIX, "/");
if (StringUtils.isEmpty(nameService)) {
throw new IllegalArgumentException("Unrecognized 'warehouse' location format"
+ " because name service is required.");
}
catalogProperty = new CatalogProperty(resource, props);
catalogProperty.addProperty(HdfsResource.HADOOP_FS_NAME, HdfsResource.HDFS_FILE_PREFIX + nameService);
}

@Override
protected void initLocalObjectsImpl() {
icebergCatalogType = ICEBERG_HADOOP;
HadoopCatalog hadoopCatalog = new HadoopCatalog();
hadoopCatalog.setConf(getConfiguration());
// initialize hive catalog
Map<String, String> catalogProperties = new HashMap<>();
String warehouse = catalogProperty.getProperties().get(CatalogProperties.WAREHOUSE_LOCATION);
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
hadoopCatalog.initialize(icebergCatalogType, catalogProperties);
catalog = hadoopCatalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergHadoopExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
Expand Down Expand Up @@ -205,6 +206,7 @@ public class GsonUtils {
.registerSubtype(IcebergGlueExternalCatalog.class, IcebergGlueExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergHadoopExternalCatalog.class, IcebergHadoopExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName())
.registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
Expand Down Expand Up @@ -102,6 +103,7 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol
case IcebergExternalCatalog.ICEBERG_REST:
case IcebergExternalCatalog.ICEBERG_DLF:
case IcebergExternalCatalog.ICEBERG_GLUE:
case IcebergExternalCatalog.ICEBERG_HADOOP:
source = new IcebergApiSource((IcebergExternalTable) table, desc, columnNameToRange);
break;
default:
Expand Down Expand Up @@ -185,15 +187,15 @@ public List<Split> getSplits() throws UserException {
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE);
HashSet<String> partitionPathSet = new HashSet<>();
String dataPath = icebergTable.location() + icebergTable.properties()
String dataPath = normalizeLocation(icebergTable.location()) + icebergTable.properties()
.getOrDefault(TableProperties.WRITE_DATA_LOCATION, DEFAULT_DATA_PATH);
boolean isPartitionedTable = icebergTable.spec().isPartitioned();

CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> {
String dataFilePath = splitTask.file().path().toString();
String dataFilePath = normalizeLocation(splitTask.file().path().toString());

// Counts the number of partitions read
if (isPartitionedTable) {
Expand Down Expand Up @@ -296,8 +298,21 @@ public TFileType getLocationType() throws UserException {

@Override
public TFileType getLocationType(String location) throws UserException {
return getTFileType(location).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for iceberg table " + icebergTable.name()));
final String fLocation = normalizeLocation(location);
return getTFileType(fLocation).orElseThrow(() ->
new DdlException("Unknown file location " + fLocation + " for iceberg table " + icebergTable.name()));
}

private String normalizeLocation(String location) {
Map<String, String> props = source.getCatalog().getProperties();
String icebergCatalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
if (icebergCatalogType.equalsIgnoreCase("hadoop")) {
if (!location.startsWith(HdfsResource.HDFS_PREFIX)) {
String fsName = props.get(HdfsResource.HADOOP_FS_NAME);
location = fsName + location;
}
}
return location;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q01 --
2879562

-- !q02 --
1
3
5
6
7
8
11

-- !q03 --
1 Customer#000000001 j5JsirBM9P MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING
3 Customer#000000003 fkRGN8n ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE
5 Customer#000000005 hwBtxkoBF qSW4KrI CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_external_catalog_iceberg_hadoop_catalog", "p2,external,iceberg,external_remote,external_remote_iceberg") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String iceberg_catalog_name = "test_external_iceberg_catalog_hadoop"
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHdfsPort = context.config.otherConfigs.get("extHdfsPort")
sql """drop catalog if exists ${iceberg_catalog_name};"""
sql """
create catalog if not exists ${iceberg_catalog_name} properties (
'type'='iceberg',
'iceberg.catalog.type'='hadoop',
'warehouse' = 'hdfs://${extHiveHmsHost}:${extHdfsPort}/usr/hive/warehouse/hadoop_catalog'
);
"""

sql """switch ${iceberg_catalog_name};"""
def q01 = {
qt_q01 """ select count(*) from iceberg_hadoop_catalog """
qt_q02 """ select c_custkey from iceberg_hadoop_catalog group by c_custkey order by c_custkey limit 7 """
qt_q03 """ select * from iceberg_hadoop_catalog order by c_custkey limit 3 """
}

sql """ use `multi_catalog`; """
q01()
}
}