diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index d03014c3434429..c7db8e3fbe9b25 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -579,6 +579,8 @@ module.exports = [ "DROP VIEW", "HLL", "RECOVER", + "REFRESH DATABASE", + "REFRESH TABLE", "RESTORE", "SHOW ENCRYPTKEYS", "TRUNCATE TABLE", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index 8a6903eda2bb4f..ba13f7082f54c9 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -584,6 +584,8 @@ module.exports = [ "DROP VIEW", "HLL", "RECOVER", + "REFRESH DATABASE", + "REFRESH TABLE", "RESTORE", "SHOW ENCRYPTKEYS", "SHOW RESOURCES", diff --git a/docs/en/extending-doris/iceberg-of-doris.md b/docs/en/extending-doris/iceberg-of-doris.md index e6a3f9cb522058..387b00cc219e00 100644 --- a/docs/en/extending-doris/iceberg-of-doris.md +++ b/docs/en/extending-doris/iceberg-of-doris.md @@ -97,8 +97,42 @@ Iceberg tables can be created in Doris in two ways. You do not need to declare t The progress of the table build in `iceberg_test_db` can be viewed by `HELP SHOW TABLE CREATION`. + +You can also create an Iceberg table by explicitly specifying the column definitions according to your needs. + +1. Create an Iceberg table + + ```sql + -- Syntax + CREATE [EXTERNAL] TABLE table_name ( + col_name col_type [NULL | NOT NULL] [COMMENT "comment"] + ) ENGINE = ICEBERG + [COMMENT "comment"] ) + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.table" = "icberg_table_name", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + + -- Example: Mount iceberg_table under iceberg_db in Iceberg + CREATE TABLE `t_iceberg` ( + `id` int NOT NULL COMMENT "id number", + `name` varchar(10) NOT NULL COMMENT "user name" + ) ENGINE = ICEBERG + PROPERTIES ( + "iceberg.database" = "iceberg_db", + "iceberg.table" = "iceberg_table", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + ``` + #### Parameter Description +- External Table Columns + - Column names should correspond to the Iceberg table + - The order of the columns needs to be consistent with the Iceberg table - ENGINE needs to be specified as ICEBERG - PROPERTIES property. - `iceberg.hive.metastore.uris`: Hive Metastore service address @@ -110,6 +144,18 @@ Iceberg tables can be created in Doris in two ways. You do not need to declare t Show table structure can be viewed by `HELP SHOW CREATE TABLE`. +### Synchronized mounts + +When the Iceberg table Schema changes, you can manually synchronize it with the `REFRESH` command, which will remove and rebuild the Iceberg external table in Doris, as seen in the `HELP REFRESH` help. + +```sql +-- Synchronize the Iceberg table +REFRESH TABLE t_iceberg; + +-- Synchronize the Iceberg database +REFRESH DATABASE iceberg_test_db; +``` + ## Data Type Matching The supported Iceberg column types correspond to Doris in the following table. @@ -134,7 +180,7 @@ The supported Iceberg column types correspond to Doris in the following table. | MAP | - | not supported | **Note:** -- Iceberg table Schema changes **are not automatically synchronized** and require rebuilding the Iceberg external tables or database in Doris. +- Iceberg table Schema changes **are not automatically synchronized** and require synchronization of Iceberg external tables or databases in Doris via the `REFRESH` command. - The current default supported version of Iceberg is 0.12.0 and has not been tested in other versions. More versions will be supported in the future. ### Query Usage @@ -144,3 +190,22 @@ Once you have finished building the Iceberg external table in Doris, it is no di ```sql select * from t_iceberg where k1 > 1000 and k3 = 'term' or k4 like '%doris'; ``` + +## Related system configurations + +### FE Configuration + +The following configurations are at the Iceberg external table system level and can be configured by modifying `fe.conf` or by `ADMIN SET CONFIG`. + +- `iceberg_table_creation_strict_mode` + + Iceberg tables are created with strict mode enabled by default. + strict mode means that the column types of the Iceberg table are strictly filtered, and if there are data types that Doris does not currently support, the creation of the table will fail. + +- `iceberg_table_creation_interval_second` + + The background task execution interval for automatic creation of Iceberg tables, default is 10s. + +- `max_iceberg_table_creation_record_size` + + The maximum value reserved for Iceberg table creation records, default is 2000. Only for creating Iceberg database records. diff --git a/docs/en/sql-reference/sql-statements/Data Definition/REFRESH DATABASE.md b/docs/en/sql-reference/sql-statements/Data Definition/REFRESH DATABASE.md new file mode 100644 index 00000000000000..805b4a06d4d8d5 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Data Definition/REFRESH DATABASE.md @@ -0,0 +1,45 @@ +--- +{ + "title": "REFRESH DATABASE", + "language": "en" +} +--- + + + +# REFRESH DATABASE + +## Description + + This statement is used to synchronize the remote Iceberg database and will delete and rebuild the Iceberg tables under the current Doris database, leaving the non-Iceberg tables unaffected. + Syntax: + REFRESH DATABASE db_name; + + Instructions. + 1) Valid only for the Iceberg database mounted in Doris. + +## Example + + 1) Refresh the database iceberg_test_db + REFRESH DATABASE iceberg_test_db; + +## keyword + + REFRESH,DATABASE diff --git a/docs/en/sql-reference/sql-statements/Data Definition/REFRESH TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/REFRESH TABLE.md new file mode 100644 index 00000000000000..69455ca0433f44 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Data Definition/REFRESH TABLE.md @@ -0,0 +1,45 @@ +--- +{ + "title": "REFRESH TABLE", + "language": "en" +} +--- + + + +# REFRESH TABLE + +## Description + + This statement is used to synchronize a remote Iceberg table and will delete and rebuild Doris' current external table. + Syntax. + REFRESH TABLE tbl_name; + + Instructions. + 1) Valid only for the Iceberg table mounted in Doris. + +## Example + + 1) Refresh the table iceberg_tbl + REFRESH TABLE iceberg_tbl; + +## keyword + + REFRESH,TABLE diff --git a/docs/zh-CN/extending-doris/iceberg-of-doris.md b/docs/zh-CN/extending-doris/iceberg-of-doris.md index 7faeb9e64ec34c..cf3450c2b20cea 100644 --- a/docs/zh-CN/extending-doris/iceberg-of-doris.md +++ b/docs/zh-CN/extending-doris/iceberg-of-doris.md @@ -97,8 +97,41 @@ Iceberg External Table of Doris 提供了 Doris 直接访问 Iceberg 外部表 `iceberg_test_db` 中的建表进度可以通过 `HELP SHOW TABLE CREATION` 查看。 +也可以根据自己的需求明确指定列定义来创建 Iceberg 外表。 + +1. 创一个 Iceberg 外表 + + ```sql + -- 语法 + CREATE [EXTERNAL] TABLE table_name ( + col_name col_type [NULL | NOT NULL] [COMMENT "comment"] + ) ENGINE = ICEBERG + [COMMENT "comment"] + PROPERTIES ( + "iceberg.database" = "iceberg_db_name", + "iceberg.table" = "icberg_table_name", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + + -- 例子:挂载 Iceberg 中 iceberg_db 下的 iceberg_table + CREATE TABLE `t_iceberg` ( + `id` int NOT NULL COMMENT "id number", + `name` varchar(10) NOT NULL COMMENT "user name" + ) ENGINE = ICEBERG + PROPERTIES ( + "iceberg.database" = "iceberg_db", + "iceberg.table" = "iceberg_table", + "iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083", + "iceberg.catalog.type" = "HIVE_CATALOG" + ); + ``` + #### 参数说明: +- 外表列 + - 列名要于 Iceberg 表一一对应 + - 列的顺序需要与 Iceberg 表一致 - ENGINE 需要指定为 ICEBERG - PROPERTIES 属性: - `iceberg.hive.metastore.uris`:Hive Metastore 服务地址 @@ -109,6 +142,18 @@ Iceberg External Table of Doris 提供了 Doris 直接访问 Iceberg 外部表 ### 展示表结构 展示表结构可以通过 `HELP SHOW CREATE TABLE` 查看。 + +### 同步挂载 + +当 Iceberg 表 Schema 发生变更时,可以通过 `REFRESH` 命令手动同步,该命令会将 Doris 中的 Iceberg 外表删除重建,具体帮助可以通过 `HELP REFRESH` 查看。 + +```sql +-- 同步 Iceberg 表 +REFRESH TABLE t_iceberg; + +-- 同步 Iceberg 数据库 +REFRESH DATABASE iceberg_test_db; +``` ## 类型匹配 @@ -134,7 +179,7 @@ Iceberg External Table of Doris 提供了 Doris 直接访问 Iceberg 外部表 | MAP | - | 不支持 | **注意:** -- Iceberg 表 Schema 变更**不会自动同步**,需要在 Doris 中重建 Iceberg 外表或数据库。 +- Iceberg 表 Schema 变更**不会自动同步**,需要在 Doris 中通过 `REFRESH` 命令同步 Iceberg 外表或数据库。 - 当前默认支持的 Iceberg 版本为 0.12.0,未在其他版本进行测试。后续后支持更多版本。 ### 查询用法 @@ -144,3 +189,22 @@ Iceberg External Table of Doris 提供了 Doris 直接访问 Iceberg 外部表 ```sql select * from t_iceberg where k1 > 1000 and k3 ='term' or k4 like '%doris'; ``` + +## 相关系统配置 + +### FE配置 + +下面几个配置属于 Iceberg 外表系统级别的配置,可以通过修改 `fe.conf` 来配置,也可以通过 `ADMIN SET CONFIG` 来配置。 + +- `iceberg_table_creation_strict_mode` + + 创建 Iceberg 表默认开启 strict mode。 + strict mode 是指对 Iceberg 表的列类型进行严格过滤,如果有 Doris 目前不支持的数据类型,则创建外表失败。 + +- `iceberg_table_creation_interval_second` + + 自动创建 Iceberg 表的后台任务执行间隔,默认为 10s。 + +- `max_iceberg_table_creation_record_size` + + Iceberg 表创建记录保留的最大值,默认为 2000. 仅针对创建 Iceberg 数据库记录。 diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/REFRESH DATABASE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/REFRESH DATABASE.md new file mode 100644 index 00000000000000..b6c545a267fc6f --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/REFRESH DATABASE.md @@ -0,0 +1,46 @@ +--- +{ + "title": "REFRESH DATABASE", + "language": "zh-CN" +} +--- + + + +# REFRESH DATABASE + +## Description + + 该语句用于同步远端 Iceberg 数据库,会将 Doris 当前数据库下的 Iceberg 外表删除重建,非 Iceberg 外表不受影响。 + 语法: + REFRESH DATABASE db_name; + + 说明: + 1) 仅针对 Doris 中挂载的 Iceberg 数据库有效。 + +## Example + + 1. 刷新数据库 iceberg_test_db + REFRESH DATABASE iceberg_test_db; + +## keyword + + REFRESH,DATABASE + diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/REFRESH TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/REFRESH TABLE.md new file mode 100644 index 00000000000000..401ba50af7cc1d --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/REFRESH TABLE.md @@ -0,0 +1,46 @@ +--- +{ + "title": "REFRESH TABLE", + "language": "zh-CN" +} +--- + + + +# REFRESH TABLE + +## Description + + 该语句用于同步远端 Iceberg 表,会将 Doris 当前的外表删除重建。 + 语法: + REFRESH TABLE tbl_name; + + 说明: + 1) 仅针对 Doris 中挂载的 Iceberg 表有效。 + +## Example + + 1. 刷新表 iceberg_tbl + REFRESH TABLE iceberg_tbl; + +## keyword + + REFRESH,TABLE + diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 4e591706561036..14b0f11aaec806 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -260,7 +260,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A KW_PLUGIN, KW_PLUGINS, KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES, KW_PROPERTY, KW_QUERY, KW_QUOTA, - KW_RANDOM, KW_RANGE, KW_READ, KW_RECOVER, KW_REGEXP, KW_RELEASE, KW_RENAME, + KW_RANDOM, KW_RANGE, KW_READ, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME, KW_REPAIR, KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLACE_IF_NOT_NULL, KW_REPLICA, KW_RESOURCE, KW_RESOURCES, KW_RESTORE, KW_RETURNS, KW_RESUME, KW_REVOKE, KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROUTINE, KW_ROW, KW_ROWS, KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SHOW, KW_SIGNED, @@ -297,7 +297,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, link_stmt, migrate_stmt, enter_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt, - import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt; + import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt, refresh_stmt; nonterminal String transaction_label; nonterminal ImportColumnDesc import_column_desc; @@ -741,6 +741,8 @@ stmt ::= {: RESULT = stmt; :} | unlock_tables_stmt:stmt {: RESULT = stmt; :} + | refresh_stmt:stmt + {: RESULT = stmt; :} | /* empty: query only has comments */ {: RESULT = new EmptyStmt(); @@ -769,6 +771,17 @@ des_cluster_name ::= :} ; +refresh_stmt ::= + KW_REFRESH KW_TABLE table_name:tbl + {: + RESULT = new RefreshTableStmt(tbl); + :} + | KW_REFRESH KW_DATABASE ident:db + {: + RESULT = new RefreshDbStmt(db); + :} + ; + // plugin statement install_plugin_stmt ::= KW_INSTALL KW_PLUGIN KW_FROM ident_or_text:source opt_properties:properties @@ -5568,6 +5581,8 @@ keyword ::= {: RESULT = id; :} | KW_RECOVER:id {: RESULT = id; :} + | KW_REFRESH:id + {: RESULT = id; :} | KW_REPEATABLE:id {: RESULT = id; :} | KW_REPLACE:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java new file mode 100644 index 00000000000000..c87863de9972e8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.InfoSchemaDb; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.common.base.Strings; + +public class RefreshDbStmt extends DdlStmt { + private static final Logger LOG = LogManager.getLogger(RefreshDbStmt.class); + + private String dbName; + + public RefreshDbStmt(String dbName) { + this.dbName = dbName; + } + + public String getDbName() { + return dbName; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + super.analyze(analyzer); + if (Strings.isNullOrEmpty(dbName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_DB_NAME, dbName); + } + if (Strings.isNullOrEmpty(analyzer.getClusterName())) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_NO_SELECT_CLUSTER); + } + dbName = ClusterNamespace.getFullName(getClusterName(), dbName); + + // Don't allow dropping 'information_schema' database + if (dbName.equalsIgnoreCase(ClusterNamespace.getFullName(getClusterName(), InfoSchemaDb.DATABASE_NAME))) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, analyzer.getQualifiedUser(), dbName); + } + // check access + if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, PrivPredicate.DROP)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, + ConnectContext.get().getQualifiedUser(), dbName); + } + if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, PrivPredicate.CREATE)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, analyzer.getQualifiedUser(), dbName); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("REFRESH DATABASE ").append("`").append(dbName).append("`"); + return sb.toString(); + } + + @Override + public String toString() { + return toSql(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshTableStmt.java new file mode 100644 index 00000000000000..a6ff7772404bb5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshTableStmt.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class RefreshTableStmt extends DdlStmt { + private static final Logger LOG = LogManager.getLogger(RefreshTableStmt.class); + + private TableName tableName; + + public RefreshTableStmt(TableName tableName) { + this.tableName = tableName; + } + + public String getDbName() { + return tableName.getDb(); + } + + public String getTblName() { + return tableName.getTbl(); + } + + public TableName getTableName() { + return tableName; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + super.analyze(analyzer); + tableName.analyze(analyzer); + + // check access + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), tableName.getDb(), + tableName.getTbl(), PrivPredicate.DROP)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "DROP"); + } + + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), tableName.getDb(), + tableName.getTbl(), PrivPredicate.CREATE)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CREATE"); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("REFRESH TABLE ").append(tableName.toSql()); + return sb.toString(); + } + + @Override + public String toString() { + return toSql(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 304c04bec36552..daf2639c402a9b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -450,6 +450,8 @@ public class Catalog { private AuditEventProcessor auditEventProcessor; + private RefreshManager refreshManager; + public List getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -616,6 +618,7 @@ private Catalog(boolean isCheckpointCatalog) { this.pluginMgr = new PluginMgr(); this.auditEventProcessor = new AuditEventProcessor(this.pluginMgr); + this.refreshManager = new RefreshManager(); } public static void destroyCheckpoint() { @@ -2642,7 +2645,7 @@ public void createDb(CreateDbStmt stmt) throws DdlException { Database db = new Database(id, fullDbName); db.setClusterName(clusterName); // check and analyze database properties before create database - db.getDbProperties().addAndBuildProperties(properties); + db.setDbProperties(new DatabaseProperty(properties).checkAndBuildProperties()); if (!tryLock(false)) { throw new DdlException("Failed to acquire catalog lock. Try again"); @@ -5163,6 +5166,10 @@ public SmallFileMgr getSmallFileMgr() { return this.smallFileMgr; } + public RefreshManager getRefreshManager() { + return this.refreshManager; + } + public long getReplayedJournalId() { return this.replayedJournalId.get(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index 3817c346147125..db93dd9ec8a103 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -226,6 +226,10 @@ public DatabaseProperty getDbProperties() { return dbProperties; } + public void setDbProperties(DatabaseProperty dbProperties) { + this.dbProperties = dbProperties; + } + public long getUsedDataQuotaWithLock() { long usedDataQuota = 0; readLock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java index ec3e830b2d3331..b5bdcee979220f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java @@ -57,6 +57,10 @@ public DatabaseProperty() { } + public DatabaseProperty(Map properties) { + this.properties = properties; + } + public void put(String key, String val) { properties.put(key, val); } @@ -77,8 +81,7 @@ public IcebergProperty getIcebergProperty() { return icebergProperty; } - public void addAndBuildProperties(Map properties) throws DdlException { - this.properties.putAll(properties); + public DatabaseProperty checkAndBuildProperties() throws DdlException { Map icebergProperties = new HashMap<>(); for (Map.Entry entry : this.properties.entrySet()) { if (entry.getKey().startsWith(ICEBERG_PROPERTY_PREFIX)) { @@ -88,6 +91,7 @@ public void addAndBuildProperties(Map properties) throws DdlExce if (icebergProperties.size() > 0) { checkAndBuildIcebergProperty(icebergProperties); } + return this; } private void checkAndBuildIcebergProperty(Map properties) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java index 9162d87ada383b..ffc933eb7ae065 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java @@ -47,6 +47,15 @@ public IcebergProperty(Map properties) { } } + // Create a new Iceberg property from other property + public IcebergProperty(IcebergProperty otherProperty) { + this.exist = otherProperty.exist; + this.database = otherProperty.database; + this.table = otherProperty.table; + this.hiveMetastoreUris = otherProperty.hiveMetastoreUris; + this.catalogType = otherProperty.catalogType; + } + public boolean isExist() { return exist; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java new file mode 100644 index 00000000000000..a6bf673eb22511 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.RefreshDbStmt; +import org.apache.doris.analysis.RefreshTableStmt; +import org.apache.doris.analysis.TableName; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +// Manager for refresh database and table action +public class RefreshManager { + private static final Logger LOG = LogManager.getLogger(RefreshManager.class); + + public void handleRefreshTable(RefreshTableStmt stmt) throws UserException { + String dbName = stmt.getDbName(); + String tableName = stmt.getTblName(); + Catalog catalog = Catalog.getCurrentCatalog(); + + // 0. check table type + Database db = catalog.getDbOrDdlException(dbName); + Table table = db.getTableNullable(tableName); + if (!(table instanceof IcebergTable)) { + throw new DdlException("Only support refresh Iceberg table."); + } + + // 1. get iceberg properties + Map icebergProperties = ((IcebergTable) table).getIcebergProperties(); + icebergProperties.put(IcebergProperty.ICEBERG_TABLE, ((IcebergTable) table).getIcebergTbl()); + icebergProperties.put(IcebergProperty.ICEBERG_DATABASE, ((IcebergTable) table).getIcebergDb()); + + // 2. drop old table + DropTableStmt dropTableStmt = new DropTableStmt(true, stmt.getTableName(), true); + catalog.dropTable(dropTableStmt); + + // 3. create new table + CreateTableStmt createTableStmt = new CreateTableStmt(true, true, + stmt.getTableName(), "ICEBERG", icebergProperties, ""); + catalog.createTable(createTableStmt); + + LOG.info("Successfully refresh table: {} from db: {}", tableName, dbName); + } + + public void handleRefreshDb(RefreshDbStmt stmt) throws DdlException { + String dbName = stmt.getDbName(); + Catalog catalog = Catalog.getCurrentCatalog(); + + Database db = catalog.getDbOrDdlException(dbName); + + // 0. build iceberg property + // Since we have only persisted database properties with key-value format in DatabaseProperty, + // we build IcebergProperty here, before checking database type. + db.getDbProperties().checkAndBuildProperties(); + // 1. check database type + if (!db.getDbProperties().getIcebergProperty().isExist()) { + throw new DdlException("Only support refresh Iceberg database."); + } + + // 2. only drop iceberg table in the database + // Current database may have other types of table, which is not allowed to drop. + for (Table table : db.getTables()) { + if (table instanceof IcebergTable) { + DropTableStmt dropTableStmt = new DropTableStmt(true, new TableName(dbName, table.getName()), true); + catalog.dropTable(dropTableStmt); + } + } + + // 3. register iceberg database to recreate iceberg table + catalog.getIcebergTableCreationRecordMgr().registerDb(db); + + LOG.info("Successfully refresh db: {}", dbName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index ecbc97ba918581..21a84308804b1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1087,9 +1087,9 @@ public class Config extends ConfigBase { public static long es_state_sync_interval_second = 10; /** - * fe will create iceberg table every es_state_sync_interval_secs + * fe will create iceberg table every iceberg_table_creation_interval_second */ - @ConfField + @ConfField(mutable = true, masterOnly = true) public static long iceberg_table_creation_interval_second = 10; /** @@ -1613,4 +1613,13 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static long min_bytes_indicate_replica_too_large = 2 * 1024 * 1024 * 1024L; + + /** + * If set to TRUE, the column definitions of iceberg table and the doris table must be consistent + * If set to FALSE, Doris only creates columns of supported data types. + * Default is true. + */ + @ConfField(mutable = true, masterOnly = true) + public static boolean iceberg_table_creation_strict_mode = true; + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java index 115c89c24cb8a6..88d64f02d6e6b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java @@ -135,6 +135,7 @@ public static void validateProperties(Map properties, boolean is /** * Get Doris IcebergTable from remote Iceberg by database and table + * @param tableId table id in Doris * @param tableName table name in Doris * @param icebergProperty Iceberg property * @param identifier Iceberg table identifier @@ -142,10 +143,9 @@ public static void validateProperties(Map properties, boolean is * @return IcebergTable in Doris * @throws DdlException */ - public static IcebergTable getTableFromIceberg(String tableName, IcebergProperty icebergProperty, + public static IcebergTable getTableFromIceberg(long tableId, String tableName, IcebergProperty icebergProperty, TableIdentifier identifier, boolean isTable) throws DdlException { - long tableId = getNextId(); IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); if (isTable && !icebergCatalog.tableExists(identifier)) { @@ -188,8 +188,19 @@ public static void createIcebergTable(Database db, CreateTableStmt stmt) throws String icebergDb = icebergProperty.getDatabase(); String icebergTbl = icebergProperty.getTable(); - IcebergTable table = getTableFromIceberg(tableName, icebergProperty, - TableIdentifier.of(icebergDb, icebergTbl), true); + // create iceberg table struct + // 1. Already set column def in Create Stmt, just create table + // 2. No column def in Create Stmt, get it from remote Iceberg schema. + IcebergTable table; + long tableId = getNextId(); + if (stmt.getColumns().size() > 0) { + // set column def in CREATE TABLE + table = new IcebergTable(tableId, tableName, stmt.getColumns(), icebergProperty, null); + } else { + // get column def from remote Iceberg + table = getTableFromIceberg(tableId, tableName, icebergProperty, + TableIdentifier.of(icebergDb, icebergTbl), true); + } // check iceberg table if exists in doris database if (!db.createTableWithLock(table, false, stmt.isSetIfNotExists()).first) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java index a20246308705bf..125818186a38fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java @@ -29,13 +29,18 @@ public class IcebergTableCreationRecord { private static final Logger LOG = LogManager.getLogger(IcebergTableCreationRecord.class); + private long dbId; + private long tableId; private String db; private String table; private String status; private String createTime; private String errorMsg; - public IcebergTableCreationRecord(String db, String table, String status, String createTime, String errorMsg) { + public IcebergTableCreationRecord(long dbId, long tableId, String db, String table, String status, + String createTime, String errorMsg) { + this.dbId = dbId; + this.tableId = tableId; this.db = db; this.table = table; this.status = status; @@ -53,6 +58,14 @@ public List getTableCreationRecord() { return record; } + public long getDbId() { + return dbId; + } + + public long getTableId() { + return tableId; + } + public String getDb() { return db; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java index 7cbed68860eb4e..24c850d42b3a0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java @@ -25,6 +25,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.common.property.PropertySchema; import org.apache.doris.common.util.MasterDaemon; @@ -63,8 +64,8 @@ public class IcebergTableCreationRecordMgr extends MasterDaemon { // used to create table private Map> dbToTableIdentifiers = Maps.newConcurrentMap(); // table creation records, used for show stmt - // db -> table -> create msg - private Map> dbToTableToCreationRecord = Maps.newConcurrentMap(); + // dbId -> tableId -> create msg + private Map> dbToTableToCreationRecord = Maps.newConcurrentMap(); private Queue tableCreationRecordQueue = new PriorityQueue<>(new TableCreationComparator()); private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -94,8 +95,8 @@ private void registerTable(Database db, TableIdentifier identifier, IcebergPrope public void deregisterDb(Database db) { icebergDbs.remove(db.getId()); dbToTableIdentifiers.remove(db); - dbToTableToCreationRecord.remove(db.getFullName()); - LOG.info("Deregister database[{}]", db.getFullName()); + dbToTableToCreationRecord.remove(db.getId()); + LOG.info("Deregister database[{}-{}]", db.getFullName(), db.getId()); } public void deregisterTable(Database db, IcebergTable table) { @@ -104,21 +105,21 @@ public void deregisterTable(Database db, IcebergTable table) { Map identifierToProperties = dbToTableIdentifiers.get(db); identifierToProperties.remove(identifier); } - if (dbToTableToCreationRecord.containsKey(db.getFullName())) { - Map recordMap = dbToTableToCreationRecord.get(db.getFullName()); - recordMap.remove(table.getName()); + if (dbToTableToCreationRecord.containsKey(db.getId())) { + Map recordMap = dbToTableToCreationRecord.get(db.getId()); + recordMap.remove(table.getId()); } - LOG.info("Deregister table[{}] from database[{}]", table.getName(), db.getFullName()); + LOG.info("Deregister table[{}-{}] from database[{}-{}]", table.getName(), + table.getId(), db.getFullName(), db.getId()); } // remove already created tables or failed tables private void removeDuplicateTables() { - for (Map.Entry> entry : dbToTableToCreationRecord.entrySet()) { - String dbName = entry.getKey(); - Catalog.getCurrentCatalog().getDb(dbName).ifPresent(db -> { + for (Map.Entry> entry : dbToTableToCreationRecord.entrySet()) { + Catalog.getCurrentCatalog().getDb(entry.getKey()).ifPresent(db -> { if (dbToTableIdentifiers.containsKey(db)) { - for (Map.Entry innerEntry : entry.getValue().entrySet()) { - String tableName = innerEntry.getKey(); + for (Map.Entry innerEntry : entry.getValue().entrySet()) { + String tableName = innerEntry.getValue().getTable(); String icebergDbName = db.getDbProperties().getIcebergProperty().getDatabase(); TableIdentifier identifier = TableIdentifier.of(icebergDbName, tableName); dbToTableIdentifiers.get(db).remove(identifier); @@ -142,7 +143,7 @@ protected void runAfterCatalogReady() { try { icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); } catch (DdlException e) { - addTableCreationRecord(db.getFullName(), "", FAIL, + addTableCreationRecord(db.getId(), -1, db.getFullName(), "", FAIL, prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage()); LOG.warn("Failed get Iceberg catalog, hive.metastore.uris[{}], error: {}", icebergProperty.getHiveMetastoreUris(), e.getMessage()); @@ -152,14 +153,15 @@ protected void runAfterCatalogReady() { icebergTables = icebergCatalog.listTables(icebergProperty.getDatabase()); } catch (DorisIcebergException e) { - addTableCreationRecord(db.getFullName(), "", FAIL, + addTableCreationRecord(db.getId(), -1, db.getFullName(), "", FAIL, prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage()); LOG.warn("Failed list remote Iceberg database, hive.metastore.uris[{}], database[{}], error: {}", icebergProperty.getHiveMetastoreUris(), icebergProperty.getDatabase(), e.getMessage()); } for (TableIdentifier identifier : icebergTables) { - icebergProperty.setTable(identifier.name()); - registerTable(db, identifier, icebergProperty); + IcebergProperty tableProperties = new IcebergProperty(icebergProperty); + tableProperties.setTable(identifier.name()); + registerTable(db, identifier, tableProperties); } } @@ -169,20 +171,21 @@ protected void runAfterCatalogReady() { for (Map.Entry innerEntry : entry.getValue().entrySet()) { TableIdentifier identifier = innerEntry.getKey(); IcebergProperty icebergProperty = innerEntry.getValue(); + long tableId = SystemIdGenerator.getNextId(); try { // get doris table from iceberg - IcebergTable table = IcebergCatalogMgr.getTableFromIceberg(identifier.name(), + IcebergTable table = IcebergCatalogMgr.getTableFromIceberg(tableId, identifier.name(), icebergProperty, identifier, false); // check iceberg table if exists in doris database if (!db.createTableWithLock(table, false, false).first) { ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, table.getName(), ErrorCode.ERR_TABLE_EXISTS_ERROR.getCode()); } - addTableCreationRecord(db.getFullName(), table.getName(), SUCCESS, + addTableCreationRecord(db.getId(), tableId, db.getFullName(), table.getName(), SUCCESS, prop.writeTimeFormat(new Date(System.currentTimeMillis())), ""); - LOG.info("Successfully create table[{}-{}]", table.getName(), table.getId()); + LOG.info("Successfully create table[{}-{}]", table.getName(), tableId); } catch (Exception e) { - addTableCreationRecord(db.getFullName(), identifier.name(), FAIL, + addTableCreationRecord(db.getId(), tableId, db.getFullName(), identifier.name(), FAIL, prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage()); LOG.warn("Failed create table[{}], error: {}", identifier.name(), e.getMessage()); } @@ -191,19 +194,18 @@ protected void runAfterCatalogReady() { removeDuplicateTables(); } - private void addTableCreationRecord(String db, String table, String status, String createTime, String errorMsg) { + private void addTableCreationRecord(long dbId, long tableId, String db, String table, String status, + String createTime, String errorMsg) { writeLock(); try { while (isQueueFull()) { IcebergTableCreationRecord record = tableCreationRecordQueue.poll(); if (record != null) { - String recordDb = record.getDb(); - String recordTable = record.getTable(); - Map tableRecords = dbToTableToCreationRecord.get(recordDb); - Iterator> tableRecordsIterator = tableRecords.entrySet().iterator(); + Map tableRecords = dbToTableToCreationRecord.get(record.getDbId()); + Iterator> tableRecordsIterator = tableRecords.entrySet().iterator(); while (tableRecordsIterator.hasNext()) { - String t = tableRecordsIterator.next().getKey(); - if (t.equals(recordTable)) { + long t = tableRecordsIterator.next().getKey(); + if (t == record.getTableId()) { tableRecordsIterator.remove(); break; } @@ -211,31 +213,32 @@ private void addTableCreationRecord(String db, String table, String status, Stri } } - IcebergTableCreationRecord record = new IcebergTableCreationRecord(db, table, status, createTime, errorMsg); + IcebergTableCreationRecord record = new IcebergTableCreationRecord(dbId, tableId, db, table, status, + createTime, errorMsg); tableCreationRecordQueue.offer(record); - if (!dbToTableToCreationRecord.containsKey(db)) { - dbToTableToCreationRecord.put(db, new ConcurrentHashMap<>()); + if (!dbToTableToCreationRecord.containsKey(dbId)) { + dbToTableToCreationRecord.put(dbId, new ConcurrentHashMap<>()); } - Map tableToRecord = dbToTableToCreationRecord.get(db); - if (!tableToRecord.containsKey(table)) { - tableToRecord.put(table, record); + Map tableToRecord = dbToTableToCreationRecord.get(dbId); + if (!tableToRecord.containsKey(tableId)) { + tableToRecord.put(tableId, record); } } finally { writeUnlock(); } } - public List getTableCreationRecordByDb(String db) { + public List getTableCreationRecordByDbId(long dbId) { List records = new ArrayList<>(); readLock(); try { - if (!dbToTableToCreationRecord.containsKey(db)) { + if (!dbToTableToCreationRecord.containsKey(dbId)) { return records; } - Map tableToRecords = dbToTableToCreationRecord.get(db); - for (Map.Entry entry : tableToRecords.entrySet()) { + Map tableToRecords = dbToTableToCreationRecord.get(dbId); + for (Map.Entry entry : tableToRecords.entrySet()) { records.add(entry.getValue()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java index 38359b2efd7203..779860baf441c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java @@ -34,6 +34,7 @@ import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.thrift.TExprOpcode; @@ -110,12 +111,21 @@ public static Schema createIcebergSchema(List columnDefs) throws User public static List createSchemaFromIcebergSchema(Schema schema) throws DdlException { List columns = Lists.newArrayList(); for (Types.NestedField nestedField : schema.columns()) { - columns.add(nestedFieldToColumn(nestedField)); + try { + columns.add(nestedFieldToColumn(nestedField)); + } catch (UnsupportedOperationException e) { + if (Config.iceberg_table_creation_strict_mode) { + throw e; + } + LOG.warn("Unsupported data type in Doris, ignore column[{}], with error: {}", + nestedField.name(), e.getMessage()); + continue; + } } return columns; } - public static Column nestedFieldToColumn(Types.NestedField field) throws DdlException { + public static Column nestedFieldToColumn(Types.NestedField field) { Type type = convertIcebergToDoris(field.type()); return new Column(field.name(), type, true, null, field.isOptional(), null, field.doc()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 74862e2d2b819f..c66349a2ce24e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -81,6 +81,8 @@ import org.apache.doris.analysis.RecoverDbStmt; import org.apache.doris.analysis.RecoverPartitionStmt; import org.apache.doris.analysis.RecoverTableStmt; +import org.apache.doris.analysis.RefreshDbStmt; +import org.apache.doris.analysis.RefreshTableStmt; import org.apache.doris.analysis.RestoreStmt; import org.apache.doris.analysis.ResumeRoutineLoadStmt; import org.apache.doris.analysis.ResumeSyncJobStmt; @@ -287,6 +289,10 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { catalog.getSqlBlockRuleMgr().dropSqlBlockRule((DropSqlBlockRuleStmt) ddlStmt); } else if (ddlStmt instanceof AlterDatabasePropertyStmt) { throw new DdlException("Not implemented yet"); + } else if (ddlStmt instanceof RefreshTableStmt) { + catalog.getRefreshManager().handleRefreshTable((RefreshTableStmt) ddlStmt); + } else if (ddlStmt instanceof RefreshDbStmt) { + catalog.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt); } else { throw new DdlException("Unknown statement."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index d612a511aaa860..5c79b2079f4388 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -2064,7 +2064,8 @@ private void handleShowTableCreation() throws AnalysisException { String dbName = showStmt.getDbName(); Database db = ctx.getCatalog().getDbOrAnalysisException(dbName); - List records = ctx.getCatalog().getIcebergTableCreationRecordMgr().getTableCreationRecordByDb(dbName); + List records = + ctx.getCatalog().getIcebergTableCreationRecordMgr().getTableCreationRecordByDbId(db.getId()); List> rowSet = Lists.newArrayList(); for (IcebergTableCreationRecord record : records) { @@ -2075,9 +2076,9 @@ private void handleShowTableCreation() throws AnalysisException { } } - // sort function rows by first column asc + // sort function rows by fourth column (Create Time) asc ListComparator> comparator = null; - OrderByPair orderByPair = new OrderByPair(0, false); + OrderByPair orderByPair = new OrderByPair(3, false); comparator = new ListComparator<>(orderByPair); Collections.sort(rowSet, comparator); List> resultRowSet = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index c87a65e3e3be8c..86ccd72914b18f 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -318,6 +318,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("read", new Integer(SqlParserSymbols.KW_READ)); keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE)); keywordMap.put("recover", new Integer(SqlParserSymbols.KW_RECOVER)); + keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH)); keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP)); keywordMap.put("release", new Integer(SqlParserSymbols.KW_RELEASE)); keywordMap.put("rename", new Integer(SqlParserSymbols.KW_RENAME));