diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 8fee7366593aee..726f8f331df99e 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -237,6 +237,7 @@ module.exports = [ "logstash", "odbc-of-doris", "hive-of-doris", + "iceberg-of-doris", "plugin-development-manual", "spark-doris-connector", "flink-doris-connector", @@ -629,6 +630,7 @@ module.exports = [ "SHOW SNAPSHOT", "SHOW SYNC JOB", "SHOW TABLES", + "SHOW TABLE CREATION", "SHOW TABLET", "SHOW TRANSACTION", "STOP ROUTINE LOAD", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index 0d2e382c6af0b1..ebd0ab8348058b 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -238,6 +238,7 @@ module.exports = [ "logstash", "odbc-of-doris", "hive-of-doris", + "iceberg-of-doris", "plugin-development-manual", "spark-doris-connector", "flink-doris-connector", @@ -631,6 +632,7 @@ module.exports = [ "SHOW SNAPSHOT", "SHOW SYNC JOB", "SHOW TABLES", + "SHOW TABLE CREATION", "SHOW TABLET", "SHOW TRANSACTION", "SPARK LOAD", diff --git a/docs/en/extending-doris/iceberg-of-doris.md b/docs/en/extending-doris/iceberg-of-doris.md new file mode 100644 index 00000000000000..e6a3f9cb522058 --- /dev/null +++ b/docs/en/extending-doris/iceberg-of-doris.md @@ -0,0 +1,146 @@ +--- +{ + "title": "Iceberg of Doris", + "language": "en" +} +--- + + + +# Iceberg External Table of Doris + +Iceberg External Table of Doris provides Doris with the ability to access Iceberg external tables directly, eliminating the need for cumbersome data import and leveraging Doris' own OLAP capabilities to solve Iceberg table data analysis problems. + + 1. support Iceberg data sources to access Doris + 2. Support joint query between Doris and Iceberg data source tables to perform more complex analysis operations + +This document introduces how to use this feature and the considerations. + +## Glossary + +### Noun in Doris + +* FE: Frontend, the front-end node of Doris, responsible for metadata management and request access +* BE: Backend, the backend node of Doris, responsible for query execution and data storage + +## How to use + +### Create Iceberg External Table + +Iceberg tables can be created in Doris in two ways. You do not need to declare the column definitions of the table when creating an external table, Doris can automatically convert them based on the column definitions of the table in Iceberg. + +1. Create a separate external table to mount the Iceberg table. + The syntax can be viewed in `HELP CREATE TABLE`. + + ```sql + -- Syntax + CREATE [EXTERNAL] TABLE table_name + ENGINE = ICEBERG + [COMMENT "comment"] + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.table" = "icberg_table_name", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + + + -- Example: Mount iceberg_table under iceberg_db in Iceberg + CREATE TABLE `t_iceberg` + ENGINE = ICEBERG + PROPERTIES ( + "iceberg.database" = "iceberg_db", + "iceberg.table" = "iceberg_table", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + ``` + +2. Create an Iceberg database to mount the corresponding Iceberg database on the remote side, and mount all the tables under the database. + You can check the syntax with `HELP CREATE DATABASE`. + + ```sql + -- Syntax + CREATE DATABASE db_name + [COMMENT "comment"] + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + + -- Example: mount the iceberg_db in Iceberg and mount all tables under that db + CREATE DATABASE `iceberg_test_db` + PROPERTIES ( + "iceberg.database" = "iceberg_db", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + ``` + + The progress of the table build in `iceberg_test_db` can be viewed by `HELP SHOW TABLE CREATION`. + +#### Parameter Description + +- ENGINE needs to be specified as ICEBERG +- PROPERTIES property. + - `iceberg.hive.metastore.uris`: Hive Metastore service address + - `iceberg.database`: the name of the database to which Iceberg is mounted + - `iceberg.table`: the name of the table to which Iceberg is mounted, not required when mounting Iceberg database. + - `iceberg.catalog.type`: the catalog method used in Iceberg, the default is `HIVE_CATALOG`, currently only this method is supported, more Iceberg catalog access methods will be supported in the future. + +### Show table structure + +Show table structure can be viewed by `HELP SHOW CREATE TABLE`. + +## Data Type Matching + +The supported Iceberg column types correspond to Doris in the following table. + +| Iceberg | Doris | Description | +| :------: | :----: | :-------------------------------: | +| BOOLEAN | BOOLEAN | | +| INTEGER | INT | | +| LONG | BIGINT | | +| FLOAT | FLOAT | | +| DOUBLE | DOUBLE | | +| DATE | DATE | | +| TIMESTAMP | DATETIME | Timestamp to Datetime with loss of precision | +| STRING | STRING | | +| UUID | VARCHAR | Use VARCHAR instead | +| DECIMAL | DECIMAL | | +| TIME | - | not supported | +| FIXED | - | not supported | +| BINARY | - | not supported | +| STRUCT | - | not supported | +| LIST | - | not supported | +| MAP | - | not supported | + +**Note:** +- Iceberg table Schema changes **are not automatically synchronized** and require rebuilding the Iceberg external tables or database in Doris. +- The current default supported version of Iceberg is 0.12.0 and has not been tested in other versions. More versions will be supported in the future. + +### Query Usage + +Once you have finished building the Iceberg external table in Doris, it is no different from a normal Doris OLAP table except that you cannot use the data models in Doris (rollup, preaggregation, materialized views, etc.) + +```sql +select * from t_iceberg where k1 > 1000 and k3 = 'term' or k4 like '%doris'; +``` diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE DATABASE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE DATABASE.md index 012aabb97bc42e..7dadd2ef6e8a52 100644 --- a/docs/en/sql-reference/sql-statements/Data Definition/CREATE DATABASE.md +++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE DATABASE.md @@ -25,14 +25,44 @@ under the License. --> # CREATE DATABASE + ## Description -This statement is used to create a new database -Grammar: -CREATE DATABASE [IF NOT EXISTS] db_name; + + This statement is used to create a new database + Syntax: + CREATE DATABASE [IF NOT EXISTS] db_name + [PROPERTIES ("key"="value", ...)] ; + +1. PROPERTIES + Additional information of a database, can be defaulted. + 1) In case of iceberg, the following information needs to be provided in the properties. + ``` + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ) + + ``` + `iceberg.database` is the name of the database corresponding to Iceberg. + `iceberg.hive.metastore.uris` is the address of the hive metastore service. + `iceberg.catalog.type` defaults to `HIVE_CATALOG`. Currently, only `HIVE_CATALOG` is supported, more Iceberg catalog types will be supported later. ## example -1. New database db_test -CREATE DATABASE db_test; + 1. Create a new database db_test + ``` + CREATE DATABASE db_test; + ``` + + 2. Create a new Iceberg database iceberg_test + ``` + CREATE DATABASE `iceberg_test` + PROPERTIES ( + "iceberg.database" = "doris", + "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + ``` ## keyword CREATE,DATABASE diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 329b053b541808..5226d234c1010f 100644 --- a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -35,7 +35,7 @@ Syntax: CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name (column_definition1[, column_definition2, ...] [, index_definition1[, ndex_definition12,]]) - [ENGINE = [olap|mysql|broker|hive]] + [ENGINE = [olap|mysql|broker|hive|iceberg]] [key_desc] [COMMENT "table comment"] [partition_desc] @@ -106,7 +106,7 @@ Syntax: Notice: Only support BITMAP index in current version, BITMAP can only apply to single column 3. ENGINE type - Default is olap. Options are: olap, mysql, broker, hive + Default is olap. Options are: olap, mysql, broker, hive, iceberg 1) For mysql, properties should include: ``` @@ -156,6 +156,21 @@ Syntax: ) ``` "database" is the name of the database corresponding to the hive table, "table" is the name of the hive table, and "hive.metastore.uris" is the hive metastore service address. + + 4) For iceberg, properties should include: + ``` + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.table" = "iceberg_table_name", + "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ) + + ``` + database is the name of the database corresponding to Iceberg. + table is the name of the table corresponding to Iceberg. + hive.metastore.uris is the address of the hive metastore service. + catalog.type defaults to HIVE_CATALOG. Currently, only HIVE_CATALOG is supported, more Iceberg catalog types will be supported later. 4. key_desc Syntax: @@ -788,6 +803,19 @@ Syntax: ); ``` +17. Create an Iceberg external table + +``` + CREATE TABLE example_db.t_iceberg + ENGINE=ICEBERG + PROPERTIES ( + "iceberg.database" = "iceberg_db", + "iceberg.table" = "iceberg_table", + "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); +``` + ## keyword CREATE,TABLE diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TABLE CREATION.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TABLE CREATION.md new file mode 100644 index 00000000000000..08a730bc7a4190 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TABLE CREATION.md @@ -0,0 +1,82 @@ +--- +{ + "title": "SHOW TABLE CREATION", + "language": "en" +} +--- + + + +# SHOW TABLE CREATION + +## Description + + This statement is used to show the execution of the specified Iceberg Database table creation task + Syntax. + SHOW TABLE CREATION [FROM db_name] [LIKE table_name_wild]; + + Description. + 1. Usage Notes + 1) If db_name is not specified, the current default db is used. + 2) If you use LIKE, it will match the table creation task with table_name_wild in the table name + 2. The meaning of each column + 1) Database: the name of the database + 2) Table: the name of the table to be created + 3) Status: the creation status of the table, `success`/`fail`. + 4) CreateTime: the time to perform the task of creating the table + 5) Error Msg: Error message of the failed table creation, or empty if it succeeds. + +## example + + 1. Show all the table creation tasks in the default Iceberg db + SHOW TABLE CREATION; + + mysql> show table creation; + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + | Database | Table | Status | Create Time | Error Msg | + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + | default_cluster:iceberg_db | logs_1 | success | 2022-01-24 19:42:45 | | + | default_cluster:iceberg_db | logs | fail | 2022-01-24 19:42:45 | Cannot convert Iceberg type[list] to Doris type. | + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + + 2. Show the table creation tasks in the specified Iceberg db + SHOW TABLE CREATION FROM example_db; + + mysql> show table creation from iceberg_db; + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + | Database | Table | Status | Create Time | Error Msg | + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + | default_cluster:iceberg_db | logs_1 | success | 2022-01-24 19:42:45 | | + | default_cluster:iceberg_db | logs | fail | 2022-01-24 19:42:45 | Cannot convert Iceberg type[list] to Doris type. | + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + + 3. Show table creation tasks for the specified Iceberg db with the string "log" in the table name + SHOW TABLE CREATION FROM example_db LIKE '%log%'; + + mysql> show table creation from iceberg_db like "%1"; + +----------------------------+--------+---------+---------------------+-----------+ + | Database | Table | Status | Create Time | Error Msg | + +----------------------------+--------+---------+---------------------+-----------+ + | default_cluster:iceberg_db | logs_1 | success | 2022-01-24 19:42:45 | | + +----------------------------+--------+---------+---------------------+-----------+ + +## keyword + + SHOW,TABLE CREATION diff --git a/docs/zh-CN/extending-doris/iceberg-of-doris.md b/docs/zh-CN/extending-doris/iceberg-of-doris.md new file mode 100644 index 00000000000000..7faeb9e64ec34c --- /dev/null +++ b/docs/zh-CN/extending-doris/iceberg-of-doris.md @@ -0,0 +1,146 @@ +--- +{ + "title": "Iceberg of Doris", + "language": "zh-CN" +} +--- + + + +# Iceberg External Table of Doris + +Iceberg External Table of Doris 提供了 Doris 直接访问 Iceberg 外部表的能力,外部表省去了繁琐的数据导入工作,并借助 Doris 本身的 OLAP 的能力来解决 Iceberg 表的数据分析问题: + + 1. 支持 Iceberg 数据源接入Doris + 2. 支持 Doris 与 Iceberg 数据源中的表联合查询,进行更加复杂的分析操作 + +本文档主要介绍该功能的使用方式和注意事项等。 + +## 名词解释 + +### Doris 相关 + +* FE:Frontend,Doris 的前端节点,负责元数据管理和请求接入 +* BE:Backend,Doris 的后端节点,负责查询执行和数据存储 + +## 使用方法 + +### Doris 中创建 Iceberg 的外表 + +可以通过以下两种方式在 Doris 中创建 Iceberg 外表。建外表时无需声明表的列定义,Doris 可以根据 Iceberg 中表的列定义自动转换。 + +1. 创建一个单独的外表,用于挂载 Iceberg 表。 + 具体相关语法,可以通过 `HELP CREATE TABLE` 查看。 + + ```sql + -- 语法 + CREATE [EXTERNAL] TABLE table_name + ENGINE = ICEBERG + [COMMENT "comment"] + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.table" = "icberg_table_name", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + + + -- 例子:挂载 Iceberg 中 iceberg_db 下的 iceberg_table + CREATE TABLE `t_iceberg` + ENGINE = ICEBERG + PROPERTIES ( + "iceberg.database" = "iceberg_db", + "iceberg.table" = "iceberg_table", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + ``` + +2. 创建一个 Iceberg 数据库,用于挂载远端对应 Iceberg 数据库,同时挂载该 database 下的所有 table。 + 具体相关语法,可以通过 `HELP CREATE DATABASE` 查看。 + + ```sql + -- 语法 + CREATE DATABASE db_name + [COMMENT "comment"] + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + + -- 例子:挂载 Iceberg 中的 iceberg_db,同时挂载该 db 下的所有 table + CREATE DATABASE `iceberg_test_db` + PROPERTIES ( + "iceberg.database" = "iceberg_db", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + ``` + + `iceberg_test_db` 中的建表进度可以通过 `HELP SHOW TABLE CREATION` 查看。 + +#### 参数说明: + +- ENGINE 需要指定为 ICEBERG +- PROPERTIES 属性: + - `iceberg.hive.metastore.uris`:Hive Metastore 服务地址 + - `iceberg.database`:挂载 Iceberg 对应的数据库名 + - `iceberg.table`:挂载 Iceberg 对应的表名,挂载 Iceberg database 时无需指定。 + - `iceberg.catalog.type`:Iceberg 中使用的 catalog 方式,默认为 `HIVE_CATALOG`,当前仅支持该方式,后续会支持更多的 Iceberg catalog 接入方式。 + +### 展示表结构 + +展示表结构可以通过 `HELP SHOW CREATE TABLE` 查看。 + +## 类型匹配 + +支持的 Iceberg 列类型与 Doris 对应关系如下表: + +| Iceberg | Doris | 描述 | +| :------: | :----: | :-------------------------------: | +| BOOLEAN | BOOLEAN | | +| INTEGER | INT | | +| LONG | BIGINT | | +| FLOAT | FLOAT | | +| DOUBLE | DOUBLE | | +| DATE | DATE | | +| TIMESTAMP | DATETIME | Timestamp 转成 Datetime 会损失精度 | +| STRING | STRING | | +| UUID | VARCHAR | 使用 VARCHAR 来代替 | +| DECIMAL | DECIMAL | | +| TIME | - | 不支持 | +| FIXED | - | 不支持 | +| BINARY | - | 不支持 | +| STRUCT | - | 不支持 | +| LIST | - | 不支持 | +| MAP | - | 不支持 | + +**注意:** +- Iceberg 表 Schema 变更**不会自动同步**,需要在 Doris 中重建 Iceberg 外表或数据库。 +- 当前默认支持的 Iceberg 版本为 0.12.0,未在其他版本进行测试。后续后支持更多版本。 + +### 查询用法 + +完成在 Doris 中建立 Iceberg 外表后,除了无法使用 Doris 中的数据模型(rollup、预聚合、物化视图等)外,与普通的 Doris OLAP 表并无区别 + +```sql +select * from t_iceberg where k1 > 1000 and k3 ='term' or k4 like '%doris'; +``` diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE DATABASE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE DATABASE.md index 48e2ef3e901385..1fac750903f909 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE DATABASE.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE DATABASE.md @@ -25,15 +25,45 @@ under the License. --> # CREATE DATABASE -## description + +## Description + 该语句用于新建数据库(database) 语法: - CREATE DATABASE [IF NOT EXISTS] db_name; + CREATE DATABASE [IF NOT EXISTS] db_name + [PROPERTIES ("key"="value", ...)]; + +1. PROPERTIES + 该数据库的附加信息,可以缺省。 + 1)如果创建 Iceberg 数据库,则需要在 properties 中提供以下信息: + ``` + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ) + + ``` + 其中 `iceberg.database` 是 Iceberg 对应的库名; + `iceberg.hive.metastore.uris` 是 hive metastore 服务地址。 + `iceberg.catalog.type` 默认为 `HIVE_CATALOG`。当前仅支持 `HIVE_CATALOG`,后续会支持更多 Iceberg catalog 类型。 ## example 1. 新建数据库 db_test + ``` CREATE DATABASE db_test; + ``` + 2. 新建 Iceberg 数据库 iceberg_test + ``` + CREATE DATABASE `iceberg_test` + PROPERTIES ( + "iceberg.database" = "doris", + "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + ``` + ## keyword CREATE,DATABASE diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index bcbd152f46d4b9..d061da8e042338 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -35,7 +35,7 @@ under the License. CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name (column_definition1[, column_definition2, ...] [, index_definition1[, ndex_definition12,]]) - [ENGINE = [olap|mysql|broker|hive]] + [ENGINE = [olap|mysql|broker|hive|iceberg]] [key_desc] [COMMENT "table comment"]; [partition_desc] @@ -114,7 +114,7 @@ under the License. 当前仅支持BITMAP索引, BITMAP索引仅支持应用于单列 3. ENGINE 类型 - 默认为 olap。可选 mysql, broker, hive + 默认为 olap。可选 mysql, broker, hive, iceberg 1) 如果是 mysql,则需要在 properties 提供以下信息: ``` @@ -166,6 +166,22 @@ under the License. ``` 其中 database 是 hive 表对应的库名字,table 是 hive 表的名字,hive.metastore.uris 是 hive metastore 服务地址。 + 4)如果是 iceberg,则需要在 properties 中提供以下信息: + ``` + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.table" = "iceberg_table_name", + "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ) + + ``` + 其中 database 是 Iceberg 对应的库名; + table 是 Iceberg 中对应的表名; + hive.metastore.uris 是 hive metastore 服务地址; + catalog.type 默认为 HIVE_CATALOG。当前仅支持 HIVE_CATALOG,后续会支持更多 Iceberg catalog 类型。 + + 4. key_desc 语法: `key_type(k1[,k2 ...])` @@ -828,6 +844,19 @@ under the License. ); ``` +17. 创建一个 Iceberg 外表 + +``` + CREATE TABLE example_db.t_iceberg + ENGINE=ICEBERG + PROPERTIES ( + "iceberg.database" = "iceberg_db", + "iceberg.table" = "iceberg_table", + "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); +``` + ## keyword CREATE,TABLE diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TABLE CREATION.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TABLE CREATION.md new file mode 100644 index 00000000000000..9ea1dfdcbf4c11 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TABLE CREATION.md @@ -0,0 +1,82 @@ +--- +{ + "title": "SHOW TABLE CREATION", + "language": "zh-CN" +} +--- + + + +# SHOW TABLE CREATION + +## Description + + 该语句用于展示指定的 Iceberg Database 建表任务的执行情况 + 语法: + SHOW TABLE CREATION [FROM db_name] [LIKE table_name_wild]; + + 说明: + 1. 使用说明 + 1) 如果不指定 db_name,使用当前默认 db + 2) 如果使用 LIKE,则会匹配表名中包含 table_name_wild 的建表任务 + 2. 各列含义说明 + 1) Database: 数据库名称 + 2) Table:要创建表的名称 + 3) Status:表的创建状态,`success`/`fail` + 4) CreateTime:执行创建该表任务的时间 + 5) Error Msg:创建表失败的错误信息,如果成功,则为空。 +## example + + 1. 展示默认 Iceberg db 中所有的建表任务 + SHOW TABLE CREATION; + + mysql> show table creation ; + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + | Database | Table | Status | Create Time | Error Msg | + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + | default_cluster:iceberg_db | logs_1 | success | 2022-01-24 19:42:45 | | + | default_cluster:iceberg_db | logs | fail | 2022-01-24 19:42:45 | Cannot convert Iceberg type[list] to Doris type. | + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + + 2. 展示指定 Iceberg db 中的建表任务 + SHOW TABLE CREATION FROM example_db; + + mysql> show table creation from iceberg_db; + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + | Database | Table | Status | Create Time | Error Msg | + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + | default_cluster:iceberg_db | logs_1 | success | 2022-01-24 19:42:45 | | + | default_cluster:iceberg_db | logs | fail | 2022-01-24 19:42:45 | Cannot convert Iceberg type[list] to Doris type. | + +----------------------------+--------+---------+---------------------+----------------------------------------------------------+ + + 3. 展示指定 Iceberg db 中的建表任务,表名中包含字符串 "log" 的任务 + SHOW TABLE CREATION FROM example_db LIKE '%log%'; + + mysql> show table creation from iceberg_db like "%1"; + +----------------------------+--------+---------+---------------------+-----------+ + | Database | Table | Status | Create Time | Error Msg | + +----------------------------+--------+---------+---------------------+-----------+ + | default_cluster:iceberg_db | logs_1 | success | 2022-01-24 19:42:45 | | + +----------------------------+--------+---------+---------------------+-----------+ + +## keyword + + SHOW,TABLE CREATION + diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/property/PropertySchema.java b/fe/fe-common/src/main/java/org/apache/doris/common/property/PropertySchema.java index ca9063ef6694c4..867106cfe6a24b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/property/PropertySchema.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/property/PropertySchema.java @@ -225,7 +225,7 @@ public void write(Boolean val, DataOutput out) throws IOException { public static final class DateProperty extends PropertySchema { SimpleDateFormat dateFormat; - DateProperty(String name, SimpleDateFormat dateFormat) { + public DateProperty(String name, SimpleDateFormat dateFormat) { super(name); this.dateFormat = dateFormat; } diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 10331dbbd89d24..9b1110859bdd7e 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -587,6 +587,18 @@ under the License. hadoop-hdfs provided + + + org.apache.iceberg + iceberg-core + provided + + + + org.apache.iceberg + iceberg-hive-metastore + provided + palo-fe diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 8d05458fe22612..11e244805b372f 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -239,7 +239,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BINLOG, KW_BITMAP, KW_BITMAP_UNION, KW_BLOB, KW_BOOLEAN, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN, KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CHECK, KW_CLUSTER, KW_CLUSTERS, KW_CLEAN, KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_COMPACT, - KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER, + KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CREATION, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE, KW_DELETE, KW_UPDATE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, KW_ELSE, KW_ENABLE, KW_ENCRYPTKEY, KW_ENCRYPTKEYS, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXCEPT, KW_EXCLUDE, @@ -557,6 +557,7 @@ precedence left KW_PARTITION; precedence left KW_PARTITIONS; precedence right KW_TEMPORARY; precedence right LBRACKET; +precedence left KW_ENGINE; // unused // nonterminal Expr where_clause_without_null, List col_list, opt_charset_name, AlterClause alter_cluster_clause; @@ -1193,13 +1194,13 @@ opt_intermediate_type ::= // Create Statement create_stmt ::= /* Database */ - KW_CREATE KW_DATABASE opt_if_not_exists:ifNotExists ident:db + KW_CREATE KW_DATABASE opt_if_not_exists:ifNotExists ident:db opt_properties:properties {: - RESULT = new CreateDbStmt(ifNotExists, db); + RESULT = new CreateDbStmt(ifNotExists, db, properties); :} | KW_CREATE KW_SCHEMA opt_if_not_exists:ifNotExists ident:db {: - RESULT = new CreateDbStmt(ifNotExists, db); + RESULT = new CreateDbStmt(ifNotExists, db, null); :} /* cluster */ /* KW_CREATE KW_CLUSTER ident:name opt_properties:properties KW_IDENTIFIED KW_BY STRING_LITERAL:password @@ -1256,6 +1257,11 @@ create_stmt ::= RESULT = new CreateTableStmt(ifNotExists, isExternal, name, columns, indexes, engineName, keys, partition, distribution, tblProperties, extProperties, tableComment, index); :} + | KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name KW_ENGINE EQUAL ident:engineName + properties:properties opt_comment:tableComment + {: + RESULT = new CreateTableStmt(ifNotExists, isExternal, name, engineName, properties, tableComment); + :} | KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name opt_col_list:columns opt_engine:engineName @@ -2848,6 +2854,11 @@ show_param ::= {: RESULT = new ShowColumnStatsStmt(tbl); :} + /* show table creation statement */ + | KW_TABLE KW_CREATION opt_db:db opt_wild_where + {: + RESULT = new ShowTableCreationStmt(db, parser.wild); + :} ; opt_tmp ::= @@ -5419,6 +5430,8 @@ keyword ::= {: RESULT = id; :} | KW_CONVERT:id {: RESULT = id; :} + | KW_CREATION:id + {: RESULT = id; :} | KW_DATA:id {: RESULT = id; :} | KW_DATE:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java index 71cf4f6372c297..bd32419fe18e38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java @@ -138,6 +138,10 @@ public static ColumnDef newSequenceColumnDef(Type type, AggregateType aggregateT public TypeDef getTypeDef() { return typeDef; } public Type getType() { return typeDef.getType(); } + public String getComment() { + return comment; + } + public boolean isVisible() { return visible; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateDbStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateDbStmt.java index a00a059f56cfee..e6b6f71177c1ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateDbStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateDbStmt.java @@ -19,24 +19,29 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.cluster.ClusterNamespace; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PrintableMap; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Strings; +import java.util.HashMap; +import java.util.Map; + // 用于描述CREATE DATABASE的内部结构 public class CreateDbStmt extends DdlStmt { private boolean ifNotExists; private String dbName; + private Map properties; - public CreateDbStmt(boolean ifNotExists, String dbName) { + public CreateDbStmt(boolean ifNotExists, String dbName, Map properties) { this.ifNotExists = ifNotExists; this.dbName = dbName; + this.properties = properties == null ? new HashMap<>() : properties; } public String getFullDbName() { @@ -47,8 +52,12 @@ public boolean isSetIfNotExists() { return ifNotExists; } + public Map getProperties() { + return properties; + } + @Override - public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); if (Strings.isNullOrEmpty(analyzer.getClusterName())) { ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_NO_SELECT_CLUSTER); @@ -70,6 +79,11 @@ public String toString() { public String toSql() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("CREATE DATABASE ").append("`").append(dbName).append("`"); + if (properties.size() > 0) { + stringBuilder.append("\nPROPERTIES (\n"); + stringBuilder.append(new PrintableMap<>(properties, "=", true, true, false)); + stringBuilder.append("\n)"); + } return stringBuilder.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index ac4233176b8f3a..ed2b68903a6b0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -88,6 +88,7 @@ public class CreateTableStmt extends DdlStmt { engineNames.add("broker"); engineNames.add("elasticsearch"); engineNames.add("hive"); + engineNames.add("iceberg"); } // for backup. set to -1 for normal use @@ -167,6 +168,22 @@ public CreateTableStmt(boolean ifNotExists, this.rollupAlterClauseList = rollupAlterClauseList == null ? new ArrayList<>() : rollupAlterClauseList; } + // This is for iceberg table, which has no column schema + public CreateTableStmt(boolean ifNotExists, + boolean isExternal, + TableName tableName, + String engineName, + Map properties, + String comment) { + this.ifNotExists = ifNotExists; + this.isExternal = isExternal; + this.tableName = tableName; + this.engineName = engineName; + this.properties = properties; + this.columnDefs = Lists.newArrayList(); + this.comment = Strings.nullToEmpty(comment); + } + public void addColumnDef(ColumnDef columnDef) { columnDefs.add(columnDef); } public void setIfNotExists(boolean ifNotExists) { this.ifNotExists = ifNotExists; } @@ -339,7 +356,7 @@ public void analyze(Analyzer analyzer) throws UserException { } // analyze column def - if (columnDefs == null || columnDefs.isEmpty()) { + if (!engineName.equals("iceberg") && (columnDefs == null || columnDefs.isEmpty())) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS); } // add a hidden column as delete flag for unique table @@ -473,7 +490,7 @@ private void analyzeEngineName() throws AnalysisException { } if (engineName.equals("mysql") || engineName.equals("odbc") || engineName.equals("broker") - || engineName.equals("elasticsearch") || engineName.equals("hive")) { + || engineName.equals("elasticsearch") || engineName.equals("hive") || engineName.equals("iceberg")) { if (!isExternal) { // this is for compatibility isExternal = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableCreationStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableCreationStmt.java new file mode 100644 index 00000000000000..97728ea1d2a498 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableCreationStmt.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ShowResultSetMetaData; + +import com.google.common.base.Strings; + +/** + * Show table creation records in Iceberg database + * + * Syntax: + * SHOW TABLE CREATION [FROM db] [LIKE mask] + */ +public class ShowTableCreationStmt extends ShowStmt { + + private static final ShowResultSetMetaData META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("Database", ScalarType.createVarchar(20))) + .addColumn(new Column("Table", ScalarType.createVarchar(20))) + .addColumn(new Column("Status", ScalarType.createVarchar(10))) + .addColumn(new Column("Create Time", ScalarType.createVarchar(20))) + .addColumn(new Column("Error Msg", ScalarType.createVarchar(100))) + .build(); + + private String dbName; + private String wild; + + public ShowTableCreationStmt(String db, String wild) { + this.dbName = db; + this.wild = wild; + } + + public String getDbName() { + return dbName; + } + + public String getWild() { + return wild; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + if (Strings.isNullOrEmpty(dbName)) { + dbName = analyzer.getDefaultDb(); + if (Strings.isNullOrEmpty(dbName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + } else { + dbName = ClusterNamespace.getFullName(getClusterName(), dbName); + } + } + + public boolean like(String str) { + str = str.toLowerCase(); + return str.matches(wild.replace(".", "\\.").replace("?", ".").replace("%", ".*").toLowerCase()); + } + + @Override + public String toSql() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("SHOW TABLE CREATION FROM "); + if (!Strings.isNullOrEmpty(dbName)) { + stringBuilder.append("`").append(dbName).append("` "); + } + + if (!Strings.isNullOrEmpty(wild)) { + stringBuilder.append("LIKE ").append("`").append(wild).append("`"); + } + return stringBuilder.toString(); + } + + @Override + public String toString() { + return toSql(); + } + + @Override + public ShowResultSetMetaData getMetaData() { + return META_DATA; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index d3077aaccf8d9e..71db9123b56d47 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -151,6 +151,8 @@ import org.apache.doris.deploy.impl.K8sDeployManager; import org.apache.doris.deploy.impl.LocalFileDeployManager; import org.apache.doris.external.elasticsearch.EsRepository; +import org.apache.doris.external.iceberg.IcebergCatalogMgr; +import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr; import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.ha.HAProtocol; @@ -327,6 +329,7 @@ public class Catalog { private Load load; private LoadManager loadManager; private StreamLoadRecordMgr streamLoadRecordMgr; + private IcebergTableCreationRecordMgr icebergTableCreationRecordMgr; private RoutineLoadManager routineLoadManager; private SqlBlockRuleMgr sqlBlockRuleMgr; private ExportMgr exportMgr; @@ -595,6 +598,7 @@ private Catalog(boolean isCheckpointCatalog) { this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000); + this.icebergTableCreationRecordMgr = new IcebergTableCreationRecordMgr(); this.loadEtlChecker = new LoadEtlChecker(loadManager); this.loadLoadingChecker = new LoadLoadingChecker(loadManager); this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager); @@ -1370,6 +1374,7 @@ private void startMasterOnlyDaemonThreads() { // start daemon thread to update global partition in memory information periodically partitionInMemoryInfoCollector.start(); streamLoadRecordMgr.start(); + icebergTableCreationRecordMgr.start(); } // start threads that should running on all FE @@ -2631,7 +2636,14 @@ public Frontend getFeByName(String name) { public void createDb(CreateDbStmt stmt) throws DdlException { final String clusterName = stmt.getClusterName(); String fullDbName = stmt.getFullDbName(); - long id = 0L; + Map properties = stmt.getProperties(); + + long id = getNextId(); + Database db = new Database(id, fullDbName); + db.setClusterName(clusterName); + // check and analyze database properties before create database + db.getDbProperties().addAndBuildProperties(properties); + if (!tryLock(false)) { throw new DdlException("Failed to acquire catalog lock. Try again"); } @@ -2647,9 +2659,6 @@ public void createDb(CreateDbStmt stmt) throws DdlException { ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, fullDbName); } } else { - id = getNextId(); - Database db = new Database(id, fullDbName); - db.setClusterName(clusterName); unprotectCreateDb(db); editLog.logCreateDb(db); } @@ -2657,6 +2666,11 @@ public void createDb(CreateDbStmt stmt) throws DdlException { unlock(); } LOG.info("createDb dbName = " + fullDbName + ", id = " + id); + + // create tables in iceberg database + if (db.getDbProperties().getIcebergProperty().isExist()) { + icebergTableCreationRecordMgr.registerDb(db); + } } // For replay edit log, need't lock metadata @@ -2771,6 +2785,10 @@ public void dropDb(DropDbStmt stmt) throws DdlException { } public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay) { + // drop Iceberg database table creation records + if (db.getDbProperties().getIcebergProperty().isExist()) { + icebergTableCreationRecordMgr.deregisterDb(db); + } for (Table table : db.getTables()) { table.writeLock(); try { @@ -3062,6 +3080,9 @@ public void createTable(CreateTableStmt stmt) throws UserException { } else if (engineName.equalsIgnoreCase("hive")) { createHiveTable(db, stmt); return; + } else if (engineName.equalsIgnoreCase("iceberg")) { + IcebergCatalogMgr.createIcebergTable(db, stmt); + return; } else { ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName); } @@ -4064,11 +4085,11 @@ private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlExcept long tableId = getNextId(); HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties()); hiveTable.setComment(stmt.getComment()); - // check hive table if exists in hive database + // check hive table whether exists in hive database HiveMetaStoreClient hiveMetaStoreClient = HiveMetaStoreClientHelper.getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS)); if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient, hiveTable.getHiveDb(), hiveTable.getHiveTable())) { - throw new DdlException("Table is not exists in hive: " + hiveTable.getHiveDbTable()); + throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable())); } // check hive table if exists in doris database if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) { @@ -4365,6 +4386,17 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, Table table, List< sb.append("\"table\" = \"").append(hiveTable.getHiveTable()).append("\",\n"); sb.append(new PrintableMap<>(hiveTable.getHiveProperties(), " = ", true, true, false).toString()); sb.append("\n)"); + } else if (table.getType() == TableType.ICEBERG) { + IcebergTable icebergTable = (IcebergTable) table; + if (!Strings.isNullOrEmpty(table.getComment())) { + sb.append("\nCOMMENT \"").append(table.getComment(true)).append("\""); + } + // properties + sb.append("\nPROPERTIES (\n"); + sb.append("\"iceberg.database\" = \"").append(icebergTable.getIcebergDb()).append("\",\n"); + sb.append("\"iceberg.table\" = \"").append(icebergTable.getIcebergTbl()).append("\",\n"); + sb.append(new PrintableMap<>(icebergTable.getIcebergProperties(), " = ", true, true, false).toString()); + sb.append("\n)"); } createTableStmt.add(sb.toString()); @@ -4620,6 +4652,9 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, // drop all temp partitions of this table, so that there is no temp partitions in recycle bin, // which make things easier. ((OlapTable) table).dropAllTempPartitions(); + } else if (table.getType() == TableType.ICEBERG) { + // drop Iceberg database table creation record + icebergTableCreationRecordMgr.deregisterTable(db, (IcebergTable) table); } db.dropTable(table.getName()); @@ -5051,6 +5086,10 @@ public StreamLoadRecordMgr getStreamLoadRecordMgr() { return streamLoadRecordMgr; } + public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() { + return icebergTableCreationRecordMgr; + } + public MasterTaskExecutor getPendingLoadTaskScheduler() { return pendingLoadTaskScheduler; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index 9699c8bcb9297e..1803c787abed40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -202,6 +202,10 @@ public long getReplicaQuota() { return replicaQuotaSize; } + public DatabaseProperty getDbProperties() { + return dbProperties; + } + public long getUsedDataQuotaWithLock() { long usedDataQuota = 0; readLock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java index 27ef78e0c5816c..ec3e830b2d3331 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java @@ -17,23 +17,42 @@ package org.apache.doris.catalog; +import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.external.iceberg.IcebergCatalog; +import org.apache.doris.external.iceberg.IcebergCatalogMgr; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; import java.util.Map; +/** + * DatabaseProperty contains additional information about a database. + * + * Different properties are recognized by prefix, such as `iceberg` + * If there is different type property is added, write a method, + * such as `checkAndBuildIcebergProperty` to check and build it. + */ public class DatabaseProperty implements Writable { + private static final Logger LOG = LogManager.getLogger(DatabaseProperty.class); + + public static final String ICEBERG_PROPERTY_PREFIX = "iceberg"; @SerializedName(value = "properties") private Map properties = Maps.newHashMap(); + // the following variables are built from "properties" + private IcebergProperty icebergProperty = new IcebergProperty(Maps.newHashMap()); + public DatabaseProperty() { } @@ -50,6 +69,37 @@ public String getOrDefault(String key, String defaultVal) { return properties.getOrDefault(key, defaultVal); } + public Map getProperties() { + return properties; + } + + public IcebergProperty getIcebergProperty() { + return icebergProperty; + } + + public void addAndBuildProperties(Map properties) throws DdlException { + this.properties.putAll(properties); + Map icebergProperties = new HashMap<>(); + for (Map.Entry entry : this.properties.entrySet()) { + if (entry.getKey().startsWith(ICEBERG_PROPERTY_PREFIX)) { + icebergProperties.put(entry.getKey(), entry.getValue()); + } + } + if (icebergProperties.size() > 0) { + checkAndBuildIcebergProperty(icebergProperties); + } + } + + private void checkAndBuildIcebergProperty(Map properties) throws DdlException { + IcebergCatalogMgr.validateProperties(properties, false); + icebergProperty = new IcebergProperty(properties); + String icebergDb = icebergProperty.getDatabase(); + IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); + // check database exists + if (!icebergCatalog.databaseExists(icebergDb)) { + throw new DdlException("Database [" + icebergDb + "] dose not exist in Iceberg."); + } + } @Override public void write(DataOutput out) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java new file mode 100644 index 00000000000000..9162d87ada383b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import java.util.Map; + +/** + * Iceberg property contains information to connect a remote iceberg db or table. + */ +public class IcebergProperty { + public static final String ICEBERG_DATABASE = "iceberg.database"; + public static final String ICEBERG_TABLE = "iceberg.table"; + public static final String ICEBERG_HIVE_METASTORE_URIS = "iceberg.hive.metastore.uris"; + public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type"; + + private boolean exist; + + private String database; + private String table; + private String hiveMetastoreUris; + private String catalogType; + + public IcebergProperty(Map properties) { + if (properties != null && !properties.isEmpty()) { + this.exist = true; + this.database = properties.get(ICEBERG_DATABASE); + this.table = properties.get(ICEBERG_TABLE); + this.hiveMetastoreUris = properties.get(ICEBERG_HIVE_METASTORE_URIS); + this.catalogType = properties.get(ICEBERG_CATALOG_TYPE); + } else { + this.exist = false; + } + } + + public boolean isExist() { + return exist; + } + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public String getHiveMetastoreUris() { + return hiveMetastoreUris; + } + + public String getCatalogType() { + return catalogType; + } + + public String getProperties() { + return ""; + } + + public void setTable(String table) { + this.table = table; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java new file mode 100644 index 00000000000000..c16493690f9711 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + + +import org.apache.doris.common.io.Text; +import org.apache.doris.thrift.TIcebergTable; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; + +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * External Iceberg table + */ +public class IcebergTable extends Table { + private static final Logger LOG = LogManager.getLogger(IcebergTable.class); + + // remote Iceberg database name + private String icebergDb; + // remote Iceberg table name + private String icebergTbl; + private Map icebergProperties = Maps.newHashMap(); + + private org.apache.iceberg.Table icebergTable; + + public IcebergTable() { + super(TableType.ICEBERG); + } + + public IcebergTable(long id, String tableName, List fullSchema, IcebergProperty icebergProperty, + org.apache.iceberg.Table icebergTable) { + super(id, tableName, TableType.ICEBERG, fullSchema); + this.icebergDb = icebergProperty.getDatabase(); + this.icebergTbl = icebergProperty.getTable(); + + icebergProperties.put(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS, + icebergProperty.getHiveMetastoreUris()); + icebergProperties.put(IcebergProperty.ICEBERG_CATALOG_TYPE, + icebergProperty.getCatalogType()); + this.icebergTable = icebergTable; + } + + public String getIcebergDbTable() { + return String.format("%s.%s", icebergDb, icebergTbl); + } + + public String getIcebergDb() { + return icebergDb; + } + + public void setIcebergDb(String icebergDb) { + this.icebergDb = icebergDb; + } + + public String getIcebergTbl() { + return icebergTbl; + } + + public void setIcebergTbl(String icebergTbl) { + this.icebergTbl = icebergTbl; + } + + public Map getIcebergProperties() { + return icebergProperties; + } + + public void setIcebergProperties(Map icebergProperties) { + this.icebergProperties = icebergProperties; + } + + public org.apache.iceberg.Table getIcebergTable() { + return icebergTable; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + + Text.writeString(out, icebergDb); + Text.writeString(out, icebergTbl); + + out.writeInt(icebergProperties.size()); + for (Map.Entry entry : icebergProperties.entrySet()) { + Text.writeString(out, entry.getKey()); + Text.writeString(out, entry.getValue()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + icebergDb = Text.readString(in); + icebergTbl = Text.readString(in); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + String key = Text.readString(in); + String value = Text.readString(in); + icebergProperties.put(key, value); + } + } + + @Override + public TTableDescriptor toThrift() { + TIcebergTable tIcebergTable = new TIcebergTable(getIcebergDb(), getIcebergTbl(), getIcebergProperties()); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, + fullSchema.size(), 0, getName(), ""); + tTableDescriptor.setIcebergTable(tIcebergTable); + return tTableDescriptor; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index ecf1e1e4937f9a..919a94c53552dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -63,7 +63,8 @@ public enum TableType { VIEW, BROKER, ELASTICSEARCH, - HIVE + HIVE, + ICEBERG } protected long id; @@ -271,6 +272,8 @@ public static Table read(DataInput in) throws IOException { table = new EsTable(); } else if (type == TableType.HIVE) { table = new HiveTable(); + } else if (type == TableType.ICEBERG) { + table = new IcebergTable(); } else { throw new IOException("Unknown table type: " + type.name()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 3194d1ac081c32..a3f171e04cf634 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -700,6 +700,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_stream_load_record_size = 5000; + /** + * Default max number of recent iceberg database table creation record that can be stored in memory. + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_iceberg_table_creation_record_size = 2000; + /** * Whether to disable show stream load and clear stream load records in memory. */ @@ -1080,6 +1086,12 @@ public class Config extends ConfigBase { @ConfField public static long es_state_sync_interval_second = 10; + /** + * fe will create iceberg table every es_state_sync_interval_secs + */ + @ConfField + public static long iceberg_table_creation_interval_second = 10; + /** * the factor of delay time before deciding to repair tablet. * if priority is VERY_HIGH, repair it immediately. diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/DorisIcebergException.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/DorisIcebergException.java new file mode 100644 index 00000000000000..6c5748091a7e8d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/DorisIcebergException.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg; + +/** + * Exception class for Iceberg in Doris + */ +public class DorisIcebergException extends RuntimeException { + + public DorisIcebergException(String message) { + super(message); + } + + public DorisIcebergException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/HiveCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/HiveCatalog.java new file mode 100644 index 00000000000000..21a3db8dddb0ec --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/HiveCatalog.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg; + +import org.apache.doris.catalog.IcebergProperty; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * HiveCatalog of Iceberg + */ +public class HiveCatalog implements IcebergCatalog { + private static final Logger LOG = LogManager.getLogger(HiveCatalog.class); + + private org.apache.iceberg.hive.HiveCatalog hiveCatalog; + + public HiveCatalog() { + hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + } + + @Override + public void initialize(IcebergProperty icebergProperty) { + // set hadoop conf + Configuration conf = new Configuration(); + hiveCatalog.setConf(conf); + // initialize hive catalog + Map catalogProperties = new HashMap<>(); + catalogProperties.put("uri", icebergProperty.getHiveMetastoreUris()); + hiveCatalog.initialize("hive", catalogProperties); + } + + @Override + public boolean tableExists(TableIdentifier tableIdentifier) { + return hiveCatalog.tableExists(tableIdentifier); + } + + @Override + public Table loadTable(TableIdentifier tableIdentifier) throws DorisIcebergException { + try { + return hiveCatalog.loadTable(tableIdentifier); + } catch (Exception e) { + LOG.warn("Failed to load table[{}] from database[{}], with error: {}", + tableIdentifier.name(), tableIdentifier.namespace(), e.getMessage()); + throw new DorisIcebergException(String.format("Failed to load table[%s] from database[%s]", + tableIdentifier.name(), tableIdentifier.namespace()), e); + } + } + + @Override + public List listTables(String db) throws DorisIcebergException { + try { + return hiveCatalog.listTables(Namespace.of(db)); + } catch (Exception e) { + LOG.warn("Failed to list table in database[{}], with error: {}", db, e.getMessage()); + throw new DorisIcebergException(String.format("Failed to list table in database[%s]", db), e); + } + } + + @Override + public boolean databaseExists(String db) { + return hiveCatalog.namespaceExists(Namespace.of(db)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalog.java new file mode 100644 index 00000000000000..dda3b876b035de --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalog.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg; + + +import org.apache.doris.catalog.IcebergProperty; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; + +import java.util.List; + +/** + * A Catalog API for iceberg table and namespace. + */ +public interface IcebergCatalog { + /** + * Initialize a catalog given a map of catalog properties. + * @param icebergProperty + */ + default void initialize(IcebergProperty icebergProperty) { + } + + /** + * Check whether table exists. + * @param tableIdentifier + */ + default boolean tableExists(TableIdentifier tableIdentifier) { + return false; + } + + /** + * Load a table + * @param tableIdentifier + */ + Table loadTable(TableIdentifier tableIdentifier) throws DorisIcebergException; + + /** + * Return all the identifiers under this db. + * @param db + */ + List listTables(String db) throws DorisIcebergException; + + /** + * Checks whether the database exists. + * + * @param db + */ + default boolean databaseExists(String db) { + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java new file mode 100644 index 00000000000000..115c89c24cb8a6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg; + +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.IcebergProperty; +import org.apache.doris.catalog.IcebergTable; +import org.apache.doris.common.DdlException; + +import com.google.common.base.Enums; +import com.google.common.base.Strings; +import com.google.common.collect.Maps; + +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.external.iceberg.util.IcebergUtils; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.doris.catalog.IcebergProperty.ICEBERG_CATALOG_TYPE; +import static org.apache.doris.catalog.IcebergProperty.ICEBERG_DATABASE; +import static org.apache.doris.catalog.IcebergProperty.ICEBERG_HIVE_METASTORE_URIS; +import static org.apache.doris.catalog.IcebergProperty.ICEBERG_TABLE; +import static org.apache.doris.common.SystemIdGenerator.getNextId; + +/** + * Iceberg catalog manager + */ +public class IcebergCatalogMgr { + private static final Logger LOG = LogManager.getLogger(IcebergCatalogMgr.class); + + private static final String PROPERTY_MISSING_MSG = "Iceberg %s is null. " + + "Please add properties('%s'='xxx') when create iceberg database."; + + // hive metastore uri -> iceberg catalog + // used to cache iceberg catalogs + private static final ConcurrentHashMap metastoreUriToCatalog = new ConcurrentHashMap(); + + // TODO:(qjl) We'll support more types of Iceberg catalog. + public enum CatalogType { + HIVE_CATALOG + } + + public static IcebergCatalog getCatalog(IcebergProperty icebergProperty) throws DdlException { + String uri = icebergProperty.getHiveMetastoreUris(); + if (!metastoreUriToCatalog.containsKey(uri)) { + metastoreUriToCatalog.put(uri, createCatalog(icebergProperty)); + } + return metastoreUriToCatalog.get(uri); + } + + private static IcebergCatalog createCatalog(IcebergProperty icebergProperty) throws DdlException { + CatalogType type = CatalogType.valueOf(icebergProperty.getCatalogType()); + IcebergCatalog catalog; + switch (type) { + case HIVE_CATALOG: + catalog = new HiveCatalog(); + break; + default: + throw new DdlException("Unsupported catalog type: " + type); + } + catalog.initialize(icebergProperty); + return catalog; + } + + public static void validateProperties(Map properties, boolean isTable) throws DdlException { + if (properties.size() == 0) { + throw new DdlException("Please set properties of iceberg, " + + "they are: iceberg.database and 'iceberg.hive.metastore.uris'"); + } + + Map copiedProps = Maps.newHashMap(properties); + String icebergDb = copiedProps.get(ICEBERG_DATABASE); + if (Strings.isNullOrEmpty(icebergDb)) { + throw new DdlException(String.format(PROPERTY_MISSING_MSG, ICEBERG_DATABASE, ICEBERG_DATABASE)); + } + copiedProps.remove(ICEBERG_DATABASE); + + // check hive properties + // hive.metastore.uris + String hiveMetastoreUris = copiedProps.get(ICEBERG_HIVE_METASTORE_URIS); + if (Strings.isNullOrEmpty(hiveMetastoreUris)) { + throw new DdlException(String.format(PROPERTY_MISSING_MSG, ICEBERG_HIVE_METASTORE_URIS, ICEBERG_HIVE_METASTORE_URIS)); + } + copiedProps.remove(ICEBERG_HIVE_METASTORE_URIS); + + // check iceberg catalog type + String icebergCatalogType = copiedProps.get(ICEBERG_CATALOG_TYPE); + if (Strings.isNullOrEmpty(icebergCatalogType)) { + icebergCatalogType = IcebergCatalogMgr.CatalogType.HIVE_CATALOG.name(); + properties.put(ICEBERG_CATALOG_TYPE, icebergCatalogType); + } else { + copiedProps.remove(ICEBERG_CATALOG_TYPE); + } + + if (!Enums.getIfPresent(IcebergCatalogMgr.CatalogType.class, icebergCatalogType).isPresent()) { + throw new DdlException("Unknown catalog type: " + icebergCatalogType + ". Current only support HiveCatalog."); + } + + // only check table property when it's an iceberg table + if (isTable) { + String icebergTbl = copiedProps.get(ICEBERG_TABLE); + if (Strings.isNullOrEmpty(icebergTbl)) { + throw new DdlException(String.format(PROPERTY_MISSING_MSG, ICEBERG_TABLE, ICEBERG_TABLE)); + } + copiedProps.remove(ICEBERG_TABLE); + } + + if (!copiedProps.isEmpty()) { + throw new DdlException("Unknown table properties: " + copiedProps.toString()); + } + } + + /** + * Get Doris IcebergTable from remote Iceberg by database and table + * @param tableName table name in Doris + * @param icebergProperty Iceberg property + * @param identifier Iceberg table identifier + * @param isTable + * @return IcebergTable in Doris + * @throws DdlException + */ + public static IcebergTable getTableFromIceberg(String tableName, IcebergProperty icebergProperty, + TableIdentifier identifier, + boolean isTable) throws DdlException { + long tableId = getNextId(); + IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); + + if (isTable && !icebergCatalog.tableExists(identifier)) { + throw new DdlException(String.format("Table [%s] dose not exist in Iceberg.", identifier.toString())); + } + + // get iceberg table schema + org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(identifier); + + // covert iceberg table schema to Doris's + List columns = IcebergUtils.createSchemaFromIcebergSchema(icebergTable.schema()); + + // create new iceberg table in doris + IcebergTable table = new IcebergTable(tableId, tableName, columns, icebergProperty, icebergTable); + + return table; + + } + + /** + * create iceberg table in Doris + * + * 1. check table existence in Iceberg + * 2. get table schema from Iceberg + * 3. convert Iceberg table schema to Doris table schema + * 4. create associate table in Doris + * + * @param db + * @param stmt + * @throws DdlException + */ + public static void createIcebergTable(Database db, CreateTableStmt stmt) throws DdlException { + String tableName = stmt.getTableName(); + Map properties = stmt.getProperties(); + + // validate iceberg table properties + validateProperties(properties, true); + IcebergProperty icebergProperty = new IcebergProperty(properties); + + String icebergDb = icebergProperty.getDatabase(); + String icebergTbl = icebergProperty.getTable(); + + IcebergTable table = getTableFromIceberg(tableName, icebergProperty, + TableIdentifier.of(icebergDb, icebergTbl), true); + + // check iceberg table if exists in doris database + if (!db.createTableWithLock(table, false, stmt.isSetIfNotExists()).first) { + ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist"); + } + LOG.info("successfully create table[{}-{}]", tableName, table.getId()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java new file mode 100644 index 00000000000000..a20246308705bf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +/** + * Represents the record of Iceberg table automating creation in an Iceberg database + */ +public class IcebergTableCreationRecord { + private static final Logger LOG = LogManager.getLogger(IcebergTableCreationRecord.class); + + private String db; + private String table; + private String status; + private String createTime; + private String errorMsg; + + public IcebergTableCreationRecord(String db, String table, String status, String createTime, String errorMsg) { + this.db = db; + this.table = table; + this.status = status; + this.createTime = createTime; + this.errorMsg = errorMsg; + } + + public List getTableCreationRecord() { + List record = new ArrayList<>(); + record.add(this.db); + record.add(this.table); + record.add(this.status); + record.add(this.createTime); + record.add(this.errorMsg); + return record; + } + + public String getDb() { + return db; + } + + public String getTable() { + return table; + } + + public String getStatus() { + return status; + } + + public String getCreateTime() { + return createTime; + } + + public String getErrorMsg() { + return errorMsg; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java new file mode 100644 index 00000000000000..7cbed68860eb4e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.IcebergProperty; +import org.apache.doris.catalog.IcebergTable; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.property.PropertySchema; +import org.apache.doris.common.util.MasterDaemon; + +import com.google.common.collect.Maps; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Manager for Iceberg automatic creation table records + * used to create iceberg tables and show table creation records + */ +public class IcebergTableCreationRecordMgr extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(IcebergTableCreationRecordMgr.class); + + private static final String SUCCESS = "success"; + private static final String FAIL = "fail"; + + // Iceberg databases, used to list remote iceberg tables + // dbId -> database + private Map icebergDbs = new ConcurrentHashMap<>(); + // database -> table identifier -> properties + // used to create table + private Map> dbToTableIdentifiers = Maps.newConcurrentMap(); + // table creation records, used for show stmt + // db -> table -> create msg + private Map> dbToTableToCreationRecord = Maps.newConcurrentMap(); + + private Queue tableCreationRecordQueue = new PriorityQueue<>(new TableCreationComparator()); + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + + public IcebergTableCreationRecordMgr() { + super("iceberg_table_creation_record_mgr", Config.iceberg_table_creation_interval_second * 1000); + } + + public void registerDb(Database db) throws DdlException { + long dbId = db.getId(); + icebergDbs.put(dbId, db); + LOG.info("Register a new Iceberg database[{}-{}]", dbId, db.getFullName()); + } + + private void registerTable(Database db, TableIdentifier identifier, IcebergProperty icebergProperty) { + if (dbToTableIdentifiers.containsKey(db)) { + dbToTableIdentifiers.get(db).put(identifier, icebergProperty); + } else { + Map identifierToProperties = Maps.newConcurrentMap(); + identifierToProperties.put(identifier, icebergProperty); + dbToTableIdentifiers.put(db, identifierToProperties); + } + LOG.info("Register a new table[{}] to database[{}]", identifier.name(), db.getFullName()); + } + + public void deregisterDb(Database db) { + icebergDbs.remove(db.getId()); + dbToTableIdentifiers.remove(db); + dbToTableToCreationRecord.remove(db.getFullName()); + LOG.info("Deregister database[{}]", db.getFullName()); + } + + public void deregisterTable(Database db, IcebergTable table) { + if (dbToTableIdentifiers.containsKey(db)) { + TableIdentifier identifier = TableIdentifier.of(table.getIcebergDb(), table.getIcebergTbl()); + Map identifierToProperties = dbToTableIdentifiers.get(db); + identifierToProperties.remove(identifier); + } + if (dbToTableToCreationRecord.containsKey(db.getFullName())) { + Map recordMap = dbToTableToCreationRecord.get(db.getFullName()); + recordMap.remove(table.getName()); + } + LOG.info("Deregister table[{}] from database[{}]", table.getName(), db.getFullName()); + } + + // remove already created tables or failed tables + private void removeDuplicateTables() { + for (Map.Entry> entry : dbToTableToCreationRecord.entrySet()) { + String dbName = entry.getKey(); + Catalog.getCurrentCatalog().getDb(dbName).ifPresent(db -> { + if (dbToTableIdentifiers.containsKey(db)) { + for (Map.Entry innerEntry : entry.getValue().entrySet()) { + String tableName = innerEntry.getKey(); + String icebergDbName = db.getDbProperties().getIcebergProperty().getDatabase(); + TableIdentifier identifier = TableIdentifier.of(icebergDbName, tableName); + dbToTableIdentifiers.get(db).remove(identifier); + } + } + }); + } + } + + @Override + protected void runAfterCatalogReady() { + PropertySchema.DateProperty prop = + new PropertySchema.DateProperty("key", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); + // list iceberg tables in dbs + // When listing table is done, remove database from icebergDbs. + for (Iterator> it = icebergDbs.entrySet().iterator(); it.hasNext(); it.remove()) { + Map.Entry entry = it.next(); + Database db = entry.getValue(); + IcebergProperty icebergProperty = db.getDbProperties().getIcebergProperty(); + IcebergCatalog icebergCatalog = null; + try { + icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); + } catch (DdlException e) { + addTableCreationRecord(db.getFullName(), "", FAIL, + prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage()); + LOG.warn("Failed get Iceberg catalog, hive.metastore.uris[{}], error: {}", + icebergProperty.getHiveMetastoreUris(), e.getMessage()); + } + List icebergTables = null; + try { + icebergTables = icebergCatalog.listTables(icebergProperty.getDatabase()); + + } catch (DorisIcebergException e) { + addTableCreationRecord(db.getFullName(), "", FAIL, + prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage()); + LOG.warn("Failed list remote Iceberg database, hive.metastore.uris[{}], database[{}], error: {}", + icebergProperty.getHiveMetastoreUris(), icebergProperty.getDatabase(), e.getMessage()); + } + for (TableIdentifier identifier : icebergTables) { + icebergProperty.setTable(identifier.name()); + registerTable(db, identifier, icebergProperty); + } + } + + // create table in Doris + for (Map.Entry> entry : dbToTableIdentifiers.entrySet()) { + Database db = entry.getKey(); + for (Map.Entry innerEntry : entry.getValue().entrySet()) { + TableIdentifier identifier = innerEntry.getKey(); + IcebergProperty icebergProperty = innerEntry.getValue(); + try { + // get doris table from iceberg + IcebergTable table = IcebergCatalogMgr.getTableFromIceberg(identifier.name(), + icebergProperty, identifier, false); + // check iceberg table if exists in doris database + if (!db.createTableWithLock(table, false, false).first) { + ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, + table.getName(), ErrorCode.ERR_TABLE_EXISTS_ERROR.getCode()); + } + addTableCreationRecord(db.getFullName(), table.getName(), SUCCESS, + prop.writeTimeFormat(new Date(System.currentTimeMillis())), ""); + LOG.info("Successfully create table[{}-{}]", table.getName(), table.getId()); + } catch (Exception e) { + addTableCreationRecord(db.getFullName(), identifier.name(), FAIL, + prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage()); + LOG.warn("Failed create table[{}], error: {}", identifier.name(), e.getMessage()); + } + } + } + removeDuplicateTables(); + } + + private void addTableCreationRecord(String db, String table, String status, String createTime, String errorMsg) { + writeLock(); + try { + while (isQueueFull()) { + IcebergTableCreationRecord record = tableCreationRecordQueue.poll(); + if (record != null) { + String recordDb = record.getDb(); + String recordTable = record.getTable(); + Map tableRecords = dbToTableToCreationRecord.get(recordDb); + Iterator> tableRecordsIterator = tableRecords.entrySet().iterator(); + while (tableRecordsIterator.hasNext()) { + String t = tableRecordsIterator.next().getKey(); + if (t.equals(recordTable)) { + tableRecordsIterator.remove(); + break; + } + } + } + } + + IcebergTableCreationRecord record = new IcebergTableCreationRecord(db, table, status, createTime, errorMsg); + tableCreationRecordQueue.offer(record); + + if (!dbToTableToCreationRecord.containsKey(db)) { + dbToTableToCreationRecord.put(db, new ConcurrentHashMap<>()); + } + Map tableToRecord = dbToTableToCreationRecord.get(db); + if (!tableToRecord.containsKey(table)) { + tableToRecord.put(table, record); + } + } finally { + writeUnlock(); + } + } + + public List getTableCreationRecordByDb(String db) { + List records = new ArrayList<>(); + + readLock(); + try { + if (!dbToTableToCreationRecord.containsKey(db)) { + return records; + } + Map tableToRecords = dbToTableToCreationRecord.get(db); + for (Map.Entry entry : tableToRecords.entrySet()) { + records.add(entry.getValue()); + } + + return records; + } finally { + readUnlock(); + } + } + + class TableCreationComparator implements Comparator { + @Override + public int compare(IcebergTableCreationRecord r1, IcebergTableCreationRecord r2) { + return r1.getCreateTime().compareTo(r2.getCreateTime()); + } + } + + public boolean isQueueFull() { + return tableCreationRecordQueue.size() >= Config.max_iceberg_table_creation_record_size; + } + + private void readLock() { + lock.readLock().lock(); + } + + private void readUnlock() { + lock.readLock().unlock(); + } + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeToType.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeToType.java new file mode 100644 index 00000000000000..a8569befc7baa9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeToType.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg.util; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import java.util.List; + +/** + * Convert Doris type to Iceberg type + */ +public class DorisTypeToType extends DorisTypeVisitor { + private final StructType root; + private int nextId = 0; + + public DorisTypeToType() { + this.root = null; + } + + public DorisTypeToType(StructType root) { + this.root = root; + // the root struct's fields use the first ids + this.nextId = root.getFields().size(); + } + + private int getNextId() { + int next = nextId; + nextId += 1; + return next; + } + + @Override + public Type struct(StructType struct, List types) { + throw new UnsupportedOperationException( + "Not a supported type: " + struct.toSql(0)); + } + + @Override + public Type field(StructField field, Type typeResult) { + return typeResult; + } + + @Override + public Type array(ArrayType array, Type elementType) { + throw new UnsupportedOperationException( + "Not a supported type: " + array.toSql(0)); + } + + @Override + public Type map(MapType map, Type keyType, Type valueType) { + throw new UnsupportedOperationException( + "Not a supported type: " + map.toSql(0)); + } + + @Override + public Type atomic(org.apache.doris.catalog.Type atomic) { + if (atomic.getPrimitiveType().equals(PrimitiveType.BOOLEAN)) { + return Types.BooleanType.get(); + } else if (atomic.getPrimitiveType().equals(PrimitiveType.TINYINT) + || atomic.getPrimitiveType().equals(PrimitiveType.SMALLINT) + || atomic.getPrimitiveType().equals(PrimitiveType.INT)) { + return Types.IntegerType.get(); + } else if (atomic.getPrimitiveType().equals(PrimitiveType.BIGINT) + || atomic.getPrimitiveType().equals(PrimitiveType.LARGEINT)) { + return Types.LongType.get(); + } else if (atomic.getPrimitiveType().equals(PrimitiveType.FLOAT)) { + return Types.FloatType.get(); + } else if (atomic.getPrimitiveType().equals(PrimitiveType.DOUBLE)) { + return Types.DoubleType.get(); + } else if (atomic.getPrimitiveType().equals(PrimitiveType.CHAR) + || atomic.getPrimitiveType().equals(PrimitiveType.VARCHAR)) { + return Types.StringType.get(); + } else if (atomic.getPrimitiveType().equals(PrimitiveType.DATE)) { + return Types.DateType.get(); + } else if (atomic.getPrimitiveType().equals(PrimitiveType.TIME)) { + return Types.TimeType.get(); + } else if (atomic.getPrimitiveType().equals(PrimitiveType.DECIMALV2) + || atomic.getPrimitiveType().equals(PrimitiveType.DECIMALV2)) { + return Types.DecimalType.of( + ((ScalarType) atomic).getScalarPrecision(), + ((ScalarType) atomic).getScalarScale()); + } else if (atomic.getPrimitiveType().equals(PrimitiveType.DATETIME)) { + return Types.TimestampType.withZone(); + } + // unsupported type: PrimitiveType.HLL BITMAP BINARY + + throw new UnsupportedOperationException( + "Not a supported type: " + atomic.getPrimitiveType()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeVisitor.java new file mode 100644 index 00000000000000..8393672bb956a6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeVisitor.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg.util; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Utils to visit doris and iceberg type + * @param + */ +public class DorisTypeVisitor { + public static T visit(Type type, DorisTypeVisitor visitor) { + if (type instanceof StructType) { + List fields = ((StructType) type).getFields(); + List fieldResults = Lists.newArrayListWithExpectedSize(fields.size()); + + for (StructField field : fields) { + fieldResults.add(visitor.field( + field, + visit(field.getType(), visitor))); + } + + return visitor.struct((StructType) type, fieldResults); + } else if (type instanceof MapType) { + return visitor.map((MapType) type, + visit(((MapType) type).getKeyType(), visitor), + visit(((MapType) type).getValueType(), visitor)); + } else if (type instanceof ArrayType) { + return visitor.array( + (ArrayType) type, + visit(((ArrayType) type).getItemType(), visitor)); + } else { + return visitor.atomic(type); + } + } + + public T struct(StructType struct, List fieldResults) { + return null; + } + + public T field(StructField field, T typeResult) { + return null; + } + + public T array(ArrayType array, T elementResult) { + return null; + } + + public T map(MapType map, T keyResult, T valueResult) { + return null; + } + + public T atomic(Type atomic) { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java new file mode 100644 index 00000000000000..38359b2efd7203 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java @@ -0,0 +1,406 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg.util; + + +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.BoolLiteral; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.ColumnDef; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.DecimalLiteral; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FloatLiteral; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TExprOpcode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Iceberg utils + */ +public class IcebergUtils { + private static final Logger LOG = LogManager.getLogger(IcebergUtils.class); + private static ThreadLocal columnIdThreadLocal = new ThreadLocal() { + @Override + public Integer initialValue() { + return 0; + } + }; + + /** + * Create Iceberg schema from Doris ColumnDef. + * + * @param columnDefs columns for create iceberg table + * @return Iceberg schema + * @throws UserException if has aggregate type in create table statement + */ + public static Schema createIcebergSchema(List columnDefs) throws UserException { + columnIdThreadLocal.set(1); + List nestedFields = Lists.newArrayList(); + for (ColumnDef columnDef : columnDefs) { + columnDef.analyze(false); + if (columnDef.getAggregateType() != null) { + throw new DdlException("Do not support aggregation column: " + columnDef.getName()); + } + boolean isNullable = columnDef.isAllowNull(); + org.apache.iceberg.types.Type icebergType = convertDorisToIceberg(columnDef.getType()); + if (isNullable) { + nestedFields.add( + Types.NestedField.optional(nextId(), columnDef.getName(), icebergType, columnDef.getComment())); + } else { + nestedFields.add( + Types.NestedField.required(nextId(), columnDef.getName(), icebergType, columnDef.getComment())); + } + } + return new Schema(nestedFields); + } + + public static List createSchemaFromIcebergSchema(Schema schema) throws DdlException { + List columns = Lists.newArrayList(); + for (Types.NestedField nestedField : schema.columns()) { + columns.add(nestedFieldToColumn(nestedField)); + } + return columns; + } + + public static Column nestedFieldToColumn(Types.NestedField field) throws DdlException { + Type type = convertIcebergToDoris(field.type()); + return new Column(field.name(), type, true, null, field.isOptional(), null, field.doc()); + } + + /** + * get iceberg table schema id to name mapping + * + * @param schema iceberg table schema + * @return id to name mapping + */ + public static Map getIdToName(Schema schema) { + Map idToName = new HashMap<>(); + for (Types.NestedField nestedField : schema.columns()) { + idToName.put(nestedField.fieldId(), nestedField.name()); + } + return idToName; + } + + public static List getIdentityPartitionField(PartitionSpec spec) { + return PartitionSpecVisitor.visit(spec, + new PartitionSpecVisitor() { + @Override + public String identity(String sourceName, int sourceId) { + return sourceName; + } + + @Override + public String bucket(String sourceName, int sourceId, int numBuckets) { + return null; + } + + @Override + public String truncate(String sourceName, int sourceId, int width) { + return null; + } + + @Override + public String year(String sourceName, int sourceId) { + return null; + } + + @Override + public String month(String sourceName, int sourceId) { + return null; + } + + @Override + public String day(String sourceName, int sourceId) { + return null; + } + + @Override + public String hour(String sourceName, int sourceId) { + return null; + } + + @Override + public String alwaysNull(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public String unknown(int fieldId, String sourceName, int sourceId, String transform) { + return null; + } + } + ).stream().filter(Objects::nonNull).collect(Collectors.toList()); + } + + /** + * Convert a {@link org.apache.iceberg.types.Type} to a {@link Type doris type}. + * + * @param type a iceberg Type + * @return the equivalent doris type + * @throws IllegalArgumentException if the type cannot be converted to doris + */ + public static Type convertIcebergToDoris(org.apache.iceberg.types.Type type) { + return TypeUtil.visit(type, new TypeToDorisType()); + } + + /** + * Convert a doris {@link Type struct} to a {@link org.apache.iceberg.types.Type} with new field ids. + *

+ * This conversion assigns fresh ids. + *

+ * Some data types are represented as the same doris type. These are converted to a default type. + * + * @param type a doris Type + * @return the equivalent Type + * @throws IllegalArgumentException if the type cannot be converted + */ + public static org.apache.iceberg.types.Type convertDorisToIceberg(Type type) { + return DorisTypeVisitor.visit(type, new DorisTypeToType()); + } + + public static Expression convertToIcebergExpr(Expr expr) { + if (expr == null) { + return null; + } + + // BoolLiteral + if (expr instanceof BoolLiteral) { + BoolLiteral boolLiteral = (BoolLiteral) expr; + boolean value = boolLiteral.getValue(); + if (value) { + return Expressions.alwaysTrue(); + } else { + return Expressions.alwaysFalse(); + } + } + + // CompoundPredicate + if (expr instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + switch (compoundPredicate.getOp()) { + case AND: { + Expression left = convertToIcebergExpr(compoundPredicate.getChild(0)); + Expression right = convertToIcebergExpr(compoundPredicate.getChild(1)); + if (left != null && right != null) { + return Expressions.and(left, right); + } + return null; + } + case OR: { + Expression left = convertToIcebergExpr(compoundPredicate.getChild(0)); + Expression right = convertToIcebergExpr(compoundPredicate.getChild(1)); + if (left != null && right != null) { + return Expressions.or(left, right); + } + return null; + } + case NOT: { + Expression child = convertToIcebergExpr(compoundPredicate.getChild(0)); + if (child != null) { + return Expressions.not(child); + } + return null; + } + default: + return null; + } + } + + TExprOpcode opCode = expr.getOpcode(); + switch (opCode) { + case EQ: + case NE: + case GE: + case GT: + case LE: + case LT: + case EQ_FOR_NULL: + BinaryPredicate eq = (BinaryPredicate) expr; + SlotRef slotRef = convertDorisExprToSlotRef(eq.getChild(0)); + LiteralExpr literalExpr = null; + if (slotRef == null && eq.getChild(0).isLiteral()) { + literalExpr = (LiteralExpr) eq.getChild(0); + slotRef = convertDorisExprToSlotRef(eq.getChild(1)); + } else if (eq.getChild(1).isLiteral()) { + literalExpr = (LiteralExpr) eq.getChild(1); + } + if (slotRef == null || literalExpr == null) { + return null; + } + String colName = slotRef.getColumnName(); + Object value = extractDorisLiteral(literalExpr); + if (value == null) { + if (opCode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof NullLiteral) { + return Expressions.isNull(colName); + } else { + return null; + } + } + switch (opCode) { + case EQ: + case EQ_FOR_NULL: + return Expressions.equal(colName, value); + case NE: + return Expressions.not(Expressions.equal(colName, value)); + case GE: + return Expressions.greaterThanOrEqual(colName, value); + case GT: + return Expressions.greaterThan(colName, value); + case LE: + return Expressions.lessThanOrEqual(colName, value); + case LT: + return Expressions.lessThan(colName, value); + default: + return null; + } + default: + return null; + } + } + + private static Object extractDorisLiteral(Expr expr) { + if (!expr.isLiteral()) { + return null; + } + if (expr instanceof BoolLiteral) { + BoolLiteral boolLiteral = (BoolLiteral) expr; + return boolLiteral.getValue(); + } else if (expr instanceof DateLiteral) { + DateLiteral dateLiteral = (DateLiteral) expr; + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); + StringBuilder sb = new StringBuilder(); + sb.append(dateLiteral.getYear()) + .append(dateLiteral.getMonth()) + .append(dateLiteral.getDay()) + .append(dateLiteral.getHour()) + .append(dateLiteral.getMinute()) + .append(dateLiteral.getSecond()); + Date date; + try { + date = formatter.parse(sb.toString()); + } catch (ParseException e) { + return null; + } + return date.getTime(); + } else if (expr instanceof DecimalLiteral) { + DecimalLiteral decimalLiteral = (DecimalLiteral) expr; + return decimalLiteral.getValue(); + } else if (expr instanceof FloatLiteral) { + FloatLiteral floatLiteral = (FloatLiteral) expr; + return floatLiteral.getValue(); + } else if (expr instanceof IntLiteral) { + IntLiteral intLiteral = (IntLiteral) expr; + return intLiteral.getValue(); + } else if (expr instanceof StringLiteral) { + StringLiteral stringLiteral = (StringLiteral) expr; + return stringLiteral.getStringValue(); + } + return null; + } + + private static SlotRef convertDorisExprToSlotRef(Expr expr) { + SlotRef slotRef = null; + if (expr instanceof SlotRef) { + slotRef = (SlotRef) expr; + } else if (expr instanceof CastExpr) { + if (expr.getChild(0) instanceof SlotRef) { + slotRef = (SlotRef) expr.getChild(0); + } + } + return slotRef; + } + + private static int findWidth(IntLiteral literal) { + Preconditions.checkArgument(literal.getValue() > 0 && literal.getValue() < Integer.MAX_VALUE, + "Unsupported width " + literal.getValue()); + return (int) literal.getValue(); + } + + public static int nextId() { + int nextId = columnIdThreadLocal.get(); + columnIdThreadLocal.set(nextId + 1); + return nextId; + } + + public static Set getAllDataFilesPath(org.apache.iceberg.Table table, TableOperations ops) { + org.apache.iceberg.Table dataFilesTable = MetadataTableUtils.createMetadataTableInstance( + ops, table.name(), table.name(), MetadataTableType.ALL_DATA_FILES); + + Set dataFilesPath = Sets.newHashSet(); + TableScan tableScan = dataFilesTable.newScan(); + List tasks = Lists.newArrayList(tableScan.planTasks()); + tasks.forEach(task -> + task.files().forEach(fileScanTask -> { + Lists.newArrayList(fileScanTask.asDataTask().rows()) + .forEach(row -> dataFilesPath.add(row.get(1, String.class))); + }) + ); + + return dataFilesPath; + } + + public static PartitionSpec buildPartitionSpec(Schema schema, List partitionNames) { + if (partitionNames == null || partitionNames.isEmpty()) { + return null; + } + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + for (String partitionName : partitionNames) { + builder.identity(partitionName); + } + return builder.build(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/TypeToDorisType.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/TypeToDorisType.java new file mode 100644 index 00000000000000..926b95c64f1b17 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/TypeToDorisType.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.iceberg.util; + +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +import java.util.List; + +/** + * Convert Iceberg types to Doris type + */ +public class TypeToDorisType extends TypeUtil.SchemaVisitor { + public TypeToDorisType() { + } + + @Override + public Type schema(Schema schema, Type structType) { + return structType; + } + + @Override + public Type struct(Types.StructType struct, List fieldResults) { + throw new UnsupportedOperationException( + String.format("Cannot convert Iceberg type[%s] to Doris type.", struct)); + } + + @Override + public Type field(Types.NestedField field, Type fieldResult) { + return fieldResult; + } + + @Override + public Type list(Types.ListType list, Type elementResult) { + throw new UnsupportedOperationException( + String.format("Cannot convert Iceberg type[%s] to Doris type.", list)); + } + + @Override + public Type map(Types.MapType map, Type keyResult, Type valueResult) { + throw new UnsupportedOperationException( + String.format("Cannot convert Iceberg type[%s] to Doris type.", map)); + } + + @Override + public Type primitive(org.apache.iceberg.types.Type.PrimitiveType primitive) { + switch (primitive.typeId()) { + case BOOLEAN: + return Type.BOOLEAN; + case INTEGER: + return Type.INT; + case LONG: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case DECIMAL: + Types.DecimalType decimal = (Types.DecimalType) primitive; + return ScalarType.createDecimalV2Type(decimal.precision(), decimal.scale()); + case DATE: + return Type.DATE; + case TIMESTAMP: + return Type.DATETIME; + case STRING: + return Type.STRING; + // use varchar + case UUID: + return Type.VARCHAR; + // unsupported primitive type + case TIME: + case FIXED: + case BINARY: + default: + throw new UnsupportedOperationException(String.format("Cannot convert Iceberg type[%s] to Doris type.", + primitive)); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 69af8b11f8a4b6..4fca787215f6f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -156,8 +156,7 @@ public void readFields(DataInput in) throws IOException { break; } case OperationType.OP_CREATE_DB: { - data = new Database(); - ((Database) data).readFields(in); + data = Database.read(in); isRead = true; break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index ef7d9df6d30769..99c770509f4d77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -72,6 +72,7 @@ import org.apache.doris.analysis.ShowStmt; import org.apache.doris.analysis.ShowStreamLoadStmt; import org.apache.doris.analysis.ShowSyncJobStmt; +import org.apache.doris.analysis.ShowTableCreationStmt; import org.apache.doris.analysis.ShowTableIdStmt; import org.apache.doris.analysis.ShowTableStatsStmt; import org.apache.doris.analysis.ShowTableStatusStmt; @@ -137,9 +138,11 @@ import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.OrderByPair; +import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.ProfileManager; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.external.iceberg.IcebergTableCreationRecord; import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; @@ -324,6 +327,8 @@ public ShowResultSet execute() throws AnalysisException { handleShowTableStats(); } else if (stmt instanceof ShowColumnStatsStmt) { handleShowColumnStats(); + } else if (stmt instanceof ShowTableCreationStmt) { + handleShowTableCreation(); } else { handleEmtpy(); } @@ -760,9 +765,15 @@ private void handleShowVariables() throws AnalysisException { private void handleShowCreateDb() throws AnalysisException { ShowCreateDbStmt showStmt = (ShowCreateDbStmt) stmt; List> rows = Lists.newArrayList(); - ctx.getCatalog().getDbOrAnalysisException(showStmt.getDb()); + Database db = ctx.getCatalog().getDbOrAnalysisException(showStmt.getDb()); StringBuilder sb = new StringBuilder(); sb.append("CREATE DATABASE `").append(ClusterNamespace.getNameFromFullName(showStmt.getDb())).append("`"); + if (db.getDbProperties().getProperties().size() > 0) { + sb.append("\nPROPERTIES (\n"); + sb.append(new PrintableMap<>(db.getDbProperties().getProperties(), "=", true, true, false)); + sb.append("\n)"); + } + rows.add(Lists.newArrayList(ClusterNamespace.getNameFromFullName(showStmt.getDb()), sb.toString())); resultSet = new ShowResultSet(showStmt.getMetaData(), rows); } @@ -2048,4 +2059,41 @@ public void handleShowSqlBlockRule() throws AnalysisException { resultSet = new ShowResultSet(showStmt.getMetaData(), rows); } + private void handleShowTableCreation() throws AnalysisException { + ShowTableCreationStmt showStmt = (ShowTableCreationStmt) stmt; + String dbName = showStmt.getDbName(); + Database db = ctx.getCatalog().getDbOrAnalysisException(dbName); + + List records = ctx.getCatalog().getIcebergTableCreationRecordMgr().getTableCreationRecordByDb(dbName); + + List> rowSet = Lists.newArrayList(); + for (IcebergTableCreationRecord record : records) { + List row = record.getTableCreationRecord(); + // like predicate + if (Strings.isNullOrEmpty(showStmt.getWild()) || showStmt.like(record.getTable())) { + rowSet.add(row); + } + } + + // sort function rows by first column asc + ListComparator> comparator = null; + OrderByPair orderByPair = new OrderByPair(0, false); + comparator = new ListComparator<>(orderByPair); + Collections.sort(rowSet, comparator); + List> resultRowSet = Lists.newArrayList(); + + Set keyNameSet = new HashSet<>(); + for (List row : rowSet) { + List resultRow = Lists.newArrayList(); + for (Comparable column : row) { + resultRow.add(column.toString()); + } + resultRowSet.add(resultRow); + keyNameSet.add(resultRow.get(0)); + } + + ShowResultSetMetaData showMetaData = showStmt.getMetaData(); + resultSet = new ShowResultSet(showMetaData, resultRowSet); + } + } diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index a253c3f510eb09..c87a65e3e3be8c 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -145,6 +145,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("convert", new Integer(SqlParserSymbols.KW_CONVERT)); keywordMap.put("count", new Integer(SqlParserSymbols.KW_COUNT)); keywordMap.put("create", new Integer(SqlParserSymbols.KW_CREATE)); + keywordMap.put("creation", new Integer(SqlParserSymbols.KW_CREATION)); keywordMap.put("cross", new Integer(SqlParserSymbols.KW_CROSS)); keywordMap.put("cube", new Integer(SqlParserSymbols.KW_CUBE)); keywordMap.put("current", new Integer(SqlParserSymbols.KW_CURRENT)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDbStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDbStmtTest.java index d3e93a3167b5f4..68504af0f8581c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDbStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDbStmtTest.java @@ -29,6 +29,9 @@ import mockit.Mocked; +import java.util.HashMap; +import java.util.Map; + public class CreateDbStmtTest { private Analyzer analyzer; @@ -46,7 +49,7 @@ public void setUp() { @Test public void testAnalyzeNormal() throws UserException, AnalysisException { - CreateDbStmt dbStmt = new CreateDbStmt(false, "test"); + CreateDbStmt dbStmt = new CreateDbStmt(false, "test", null); dbStmt.analyze(analyzer); Assert.assertEquals("testCluster:test", dbStmt.getFullDbName()); Assert.assertEquals("CREATE DATABASE `testCluster:test`", dbStmt.toString()); @@ -54,7 +57,32 @@ public void testAnalyzeNormal() throws UserException, AnalysisException { @Test(expected = AnalysisException.class) public void testAnalyzeWithException() throws UserException, AnalysisException { - CreateDbStmt stmt = new CreateDbStmt(false, ""); + CreateDbStmt stmt = new CreateDbStmt(false, "", null); + stmt.analyze(analyzer); + Assert.fail("no exception"); + } + + @Test + public void testAnalyzeIcebergNormal() throws UserException { + Map properties = new HashMap<>(); + properties.put("iceberg.database", "doris"); + properties.put("iceberg.hive.metastore.uris", "thrift://127.0.0.1:9087"); + CreateDbStmt stmt = new CreateDbStmt(false, "test", properties); + stmt.analyze(analyzer); + Assert.assertEquals("testCluster:test", stmt.getFullDbName()); + Assert.assertEquals("CREATE DATABASE `testCluster:test`\n" + + "PROPERTIES (\n" + + "\"iceberg.database\" = \"doris\",\n" + + "\"iceberg.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\"\n" + + ")", stmt.toString()); + } + + @Test(expected = AnalysisException.class) + public void testAnalyzeIcebergWithException() throws UserException { + Map properties = new HashMap<>(); + properties.put("iceberg.database", "doris"); + properties.put("iceberg.hive.metastore.uris", "thrift://127.0.0.1:9087"); + CreateDbStmt stmt = new CreateDbStmt(false, "test", properties); stmt.analyze(analyzer); Assert.fail("no exception"); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java index 2befaa68625466..552a8a6d15a489 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java @@ -38,7 +38,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.List; +import java.util.Map; import mockit.Mocked; @@ -246,4 +248,21 @@ public void testBmpHllIncAgg() throws Exception { hll.toString(), hll.getTypeDef().getType().toSql())); stmt.analyze(analyzer); } + + @Test + public void testCreateIcebergTable() throws UserException { + Map properties = new HashMap<>(); + properties.put("iceberg.database", "doris"); + properties.put("iceberg.table", "test"); + properties.put("iceberg.hive.metastore.uris", "thrift://127.0.0.1:9087"); + CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "iceberg", properties, ""); + stmt.analyze(analyzer); + + Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n" + + "\n" + + ") ENGINE = iceberg\n" + + "PROPERTIES (\"iceberg.database\" = \"doris\",\n" + + "\"iceberg.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" + + "\"iceberg.table\" = \"test\")", stmt.toString()); + } } \ No newline at end of file diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowTableCreationStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowTableCreationStmtTest.java new file mode 100644 index 00000000000000..c4efe22d021652 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowTableCreationStmtTest.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import mockit.Mocked; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.MockedAuth; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.qe.ConnectContext; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ShowTableCreationStmtTest { + private Analyzer analyzer; + + @Mocked + private PaloAuth auth; + @Mocked + private ConnectContext ctx; + + @Before + public void setUp() { + analyzer = AccessTestUtil.fetchAdminAnalyzer(true); + MockedAuth.mockedAuth(auth); + MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1"); + } + + @Test + public void testNormal() throws UserException { + ShowTableCreationStmt stmt = new ShowTableCreationStmt("doris", "log"); + stmt.analyze(analyzer); + Assert.assertEquals("SHOW TABLE CREATION FROM `testCluster:doris` LIKE `log`", stmt.toString()); + Assert.assertEquals(4, stmt.getMetaData().getColumnCount()); + Assert.assertEquals("Table", stmt.getMetaData().getColumn(0).getName()); + Assert.assertEquals("Status", stmt.getMetaData().getColumn(1).getName()); + } + + @Test + public void testNoDb() throws UserException { + ShowTableCreationStmt stmt = new ShowTableCreationStmt(null, null); + stmt.analyze(analyzer); + Assert.assertEquals("testCluster:testDb", stmt.getDbName()); + } +} \ No newline at end of file diff --git a/fe/pom.xml b/fe/pom.xml index b98cf2b46cdae4..4534de9851db25 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -119,6 +119,7 @@ under the License. 2.17.1 0.15-SNAPSHOT github + 0.12.0 @@ -613,6 +614,11 @@ under the License. tomcat jasper-compiler + + + org.apache.parquet + parquet-hadoop-bundle + @@ -684,6 +690,21 @@ under the License. + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + + + org.apache.iceberg + iceberg-hive-metastore + ${iceberg.version} + + org.apache.parquet parquet-column diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 467c4baea7dc99..f78bc5f6aa823f 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -243,6 +243,12 @@ struct THiveTable { 3: required map properties } +struct TIcebergTable { + 1: required string db_name + 2: required string table_name + 3: required map properties +} + // "Union" of all table types. struct TTableDescriptor { 1: required Types.TTableId id @@ -262,6 +268,7 @@ struct TTableDescriptor { 15: optional TEsTable esTable 16: optional TOdbcTable odbcTable 17: optional THiveTable hiveTable + 18: optional TIcebergTable icebergTable } struct TDescriptorTable {