From 3e9a636c1b59b970222b66430e629c0122cec71b Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Mon, 12 Aug 2024 18:47:37 +0800 Subject: [PATCH 1/9] for test. --- docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl | 3 +++ regression-test/pipeline/external/conf/fe.conf | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl b/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl index 0e07422841098a..b7e662f5e524bf 100644 --- a/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl +++ b/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl @@ -28,6 +28,9 @@ HIVE_SITE_CONF_hive_server2_webui_port=0 HIVE_SITE_CONF_hive_compactor_initiator_on=true HIVE_SITE_CONF_hive_compactor_worker_threads=2 HIVE_SITE_CONF_metastore_storage_schema_reader_impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader +HIVE_SITE_CONF_hive_metastore_event_db_notification_api_auth=false +HIVE_SITE_CONF_hive_metastore_dml_events=true +HIVE_SITE_CONF_hive_metastore_transactional_event_listeners=org.apache.hive.hcatalog.listener.DbNotificationListener CORE_CONF_fs_defaultFS=hdfs://${IP_HOST}:${FS_PORT} CORE_CONF_hadoop_http_staticuser_user=root diff --git a/regression-test/pipeline/external/conf/fe.conf b/regression-test/pipeline/external/conf/fe.conf index 8eed72816e8138..b8ed430128b3f1 100644 --- a/regression-test/pipeline/external/conf/fe.conf +++ b/regression-test/pipeline/external/conf/fe.conf @@ -96,5 +96,9 @@ auth_token = 5ff161c3-2c08-4079-b108-26c8850b6598 infodb_support_ext_catalog=true trino_connector_plugin_dir=/tmp/trino_connector/connectors +enable_hms_events_incremental_sync=true +hms_events_polling_interval_ms=2000 +hms_events_batch_size_per_rpc=10000 + KRB5_CONFIG=/keytabs/krb5.conf From 2ea55ae733f38dda64778b9ab9d6a3bbce152574 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Wed, 14 Aug 2024 19:25:09 +0800 Subject: [PATCH 2/9] change test. --- .../apache/doris/datasource/CatalogMgr.java | 28 +- .../doris/datasource/ExternalDatabase.java | 1 + .../hive/event/AddPartitionEvent.java | 1 + .../hive/event/AlterPartitionEvent.java | 4 +- .../datasource/hive/event/InsertEvent.java | 23 +- .../hive/event/MetastoreEventFactory.java | 2 + .../hive/event/MetastoreEventsProcessor.java | 17 +- .../doris/datasource/metacache/MetaCache.java | 4 + .../hive/test_hms_event_notification_2.groovy | 378 ++++++++++++++++++ 9 files changed, 423 insertions(+), 35 deletions(-) create mode 100644 regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index b8c88c9c08e11d..d251693d59520d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -655,13 +655,13 @@ public void registerExternalTableFromEvent(String dbName, String tableName, return; } - TableIf table = db.getTableNullable(tableName); - if (table != null) { - if (!ignoreIfExists) { - throw new DdlException("Table " + tableName + " has exist in db " + dbName); - } - return; - } + // TableIf table = db.getTableNullable(tableName); + // if (table != null) { + // if (!ignoreIfExists) { + // throw new DdlException("Table " + tableName + " has exist in db " + dbName); + // } + // return; + // } long tblId; HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog; if (hmsCatalog.getUseMetaCache().get()) { @@ -712,13 +712,13 @@ public void registerExternalDatabaseFromEvent(String dbName, String catalogName, if (!(catalog instanceof ExternalCatalog)) { throw new DdlException("Only support create ExternalCatalog databases"); } - DatabaseIf db = catalog.getDbNullable(dbName); - if (db != null) { - if (!ignoreIfExists) { - throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName()); - } - return; - } + // DatabaseIf db = catalog.getDbNullable(dbName); + // if (db != null) { + // if (!ignoreIfExists) { + // throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName()); + // } + // return; + // } HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog; long dbId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index dc6f9aaea73d8c..4d1ef080a51045 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -488,6 +488,7 @@ public boolean registerTable(TableIf tableIf) { if (extCatalog.getUseMetaCache().get()) { if (isInitialized()) { metaCache.updateCache(tableName, (T) tableIf); + metaCache.setIdToName(tableId, tableName); } } else { tableNameToId.put(tableName, tableId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java index ffc7b95ff59aad..1bd388f75af845 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java @@ -105,6 +105,7 @@ protected void process() throws MetastoreNotificationException { infoLog("Partition list is empty. Ignoring this event."); return; } + // add meta cache Env.getCurrentEnv().getCatalogMgr() .addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, eventTime, true); } catch (DdlException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java index 6be0215f143409..569d9878d7afaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java @@ -145,9 +145,7 @@ protected boolean canBeBatched(MetastoreEvent that) { // `that` event can be batched if this event's partitions contains all of the partitions which `that` event has // else just remove `that` event's relevant partitions for (String partitionName : getAllPartitionNames()) { - if (thatPartitionEvent instanceof AddPartitionEvent) { - ((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName); - } else if (thatPartitionEvent instanceof DropPartitionEvent) { + if (thatPartitionEvent instanceof DropPartitionEvent) { ((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java index f793ab8b0683b2..fbd45575b88577 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java @@ -24,8 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +// import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import java.util.List; @@ -33,13 +32,13 @@ * MetastoreEvent for INSERT event type */ public class InsertEvent extends MetastoreTableEvent { - private final Table hmsTbl; + // private final Table hmsTbl; // for test public InsertEvent(long eventId, String catalogName, String dbName, String tblName) { super(eventId, catalogName, dbName, tblName, MetastoreEventType.INSERT); - this.hmsTbl = null; + // this.hmsTbl = null; } private InsertEvent(NotificationEvent event, String catalogName) { @@ -47,14 +46,14 @@ private InsertEvent(NotificationEvent event, String catalogName) { Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT)); Preconditions .checkNotNull(event.getMessage(), debugString("Event message is null")); - try { - InsertMessage insertMessage = - MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) - .getInsertMessage(event.getMessage()); - hmsTbl = Preconditions.checkNotNull(insertMessage.getTableObj()); - } catch (Exception ex) { - throw new MetastoreNotificationException(ex); - } + // try { + // InsertMessage insertMessage = + // MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) + // .getInsertMessage(event.getMessage()); + // // hmsTbl = Preconditions.checkNotNull(insertMessage.getTableObj()); + // } catch (Exception ex) { + // throw new MetastoreNotificationException(ex); + // } } protected static List getEvents(NotificationEvent event, String catalogName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java index 493f1f7cb71276..c0bd4eaef22061 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java @@ -46,6 +46,7 @@ public List transferNotificationEventToMetastoreEvents(Notificat String catalogName) { Preconditions.checkNotNull(event.getEventType()); MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType()); + LOG.info("event = {}", event.toString()); switch (metastoreEventType) { case CREATE_TABLE: return CreateTableEvent.getEvents(event, catalogName); @@ -79,6 +80,7 @@ List getMetastoreEvents(List events, HMSExter for (NotificationEvent event : events) { metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, catalogName)); } + //try catch ??? List mergedEvents = mergeEvents(catalogName, metastoreEvents); if (Env.getCurrentEnv().isMaster()) { logMetaIdMappings(hmsExternalCatalog.getId(), events.get(events.size() - 1).getEventId(), mergedEvents); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 6e12c35e2b8455..9b6cfc455b1d3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -147,7 +147,7 @@ private List getNextHMSEvents(HMSExternalCatalog hmsExternalC response = getNextEventResponseForSlave(hmsExternalCatalog); } - if (response == null) { + if (response == null || response.getEventsSize() == 0) { return Collections.emptyList(); } return response.getEvents(); @@ -184,8 +184,8 @@ private void processEvents(List events, HMSExternalCatalog hm private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatalog hmsExternalCatalog) throws MetastoreNotificationFetchException { - long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog); - long currentEventId = getCurrentHmsEventId(hmsExternalCatalog); + long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog); //165 + long currentEventId = getCurrentHmsEventId(hmsExternalCatalog); //166 if (lastSyncedEventId < 0) { refreshCatalogForMaster(hmsExternalCatalog); // invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events @@ -206,17 +206,22 @@ private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatal LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName()); return null; } - + LOG.info("(CYW)catalogname = {}, lastSyncedEventId = {}, currentEventId = {}", + hmsExternalCatalog.getName(), lastSyncedEventId, currentEventId); try { - return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, + NotificationEventResponse notificationEventResponse = + hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null); + LOG.info("(CYW)catalogname = {} successs.event size = {} ", + hmsExternalCatalog.getName(), notificationEventResponse.getEvents().size()); + return notificationEventResponse; } catch (MetastoreNotificationFetchException e) { // Need a fallback to handle this because this error state can not be recovered until restarting FE if (StringUtils.isNotEmpty(e.getMessage()) && e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) { refreshCatalogForMaster(hmsExternalCatalog); // set lastSyncedEventId to currentEventId after refresh catalog successfully - updateLastSyncedEventId(hmsExternalCatalog, currentEventId); + updateLastSyncedEventId(hmsExternalCatalog, currentEventId); //TODO ??? may be not right LOG.warn("Notification events are missing, maybe an event can not be handled " + "or processing rate is too low, fallback to refresh the catalog"); return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java index e3ad8668fb5525..c8e0a45012e564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java @@ -90,6 +90,10 @@ public Optional getMetaObjById(long id) { return name == null ? Optional.empty() : getMetaObj(name, id); } + public void setIdToName(long id, String name) { + idToName.put(id, name); + } + public void updateCache(String objName, T obj) { metaObjCache.put(objName, Optional.of(obj)); namesCache.asMap().compute("", (k, v) -> { diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy new file mode 100644 index 00000000000000..34e2ffb4b4ae70 --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy @@ -0,0 +1,378 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hms_event_notification_2", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + + for (String hivePrefix : [ "hive2","hive3"]) { + try { + setHivePrefix(hivePrefix) + hive_docker """ set hive.stats.autogather=false; """ + + + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalog_name = "test_hms_event_notification_${hivePrefix}" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + int wait_time = 2000; + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + "use_meta_cache" = "false" + );""" + sql """ switch ${catalog_name} """ + + String tb1 = """${catalog_name}_tb_1""" + String tb2 = """${catalog_name}_tb_2""" + String db1 = "${catalog_name}_db_1"; + String db2 = "${catalog_name}_db_2"; + String partition_tb = "${catalog_name}_partition_tb"; + + try { + hive_docker """ use ${db1};""" + }catch (Exception e){ + } + + hive_docker """ drop table if exists ${tb1};""" + hive_docker """ drop table if exists ${tb2};""" + hive_docker """ drop table if exists ${partition_tb} """ + hive_docker """ drop database if exists ${db1};""" + hive_docker """ drop database if exists ${db2};""" + +//CREATE DATABASE + hive_docker """ create database ${db1};""" + hive_docker """ create database ${db2};""" + sleep(wait_time); + + List> dbs = sql """ show databases """ + logger.info("result = " + dbs); + + int flag_db_count = 0 ; + dbs.forEach { + if (it[0] == db1) { + flag_db_count ++; + }else if (it[0] == db2) { + flag_db_count ++; + } + } + assertTrue(flag_db_count == 2); + + + + +//ALTER DATABASE + if (hivePrefix == "hive3") { + String db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("db2 location = " + db2_location ) + + def loc_start = db2_location.indexOf("hdfs://") + def loc_end = db2_location.indexOf(".db") + 3 + db2_location = db2_location.substring(loc_start, loc_end) + logger.info("db2 location = " + db2_location ) + + String new_db2_location = db2_location.replace("warehouse", "new_warehouse_xxx") + logger.info("change db2 location to ${new_db2_location} ") + + logger.info(" alter database begin") + hive_docker """ ALTER DATABASE ${db2} SET LOCATION '${new_db2_location}'; """ + logger.info(" alter database end") + sleep(wait_time); + + String query_db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("query_db2_location = ${query_db2_location} ") + + loc_start = query_db2_location.indexOf("hdfs://") + loc_end = query_db2_location.indexOf(".db") + 3 + query_db2_location = query_db2_location.substring(loc_start, loc_end) + + assertTrue(query_db2_location == new_db2_location); + } + + +//DROP DATABASE + hive_docker """drop database ${db2}; """; + sleep(wait_time); + dbs = sql """ show databases """ + logger.info("result = " + dbs); + flag_db_count = 0 ; + dbs.forEach { + if (it[0].toString() == db1) { + flag_db_count ++; + } else if (it[0].toString() == db2) { + logger.info(" exists ${db2}") + assertTrue(false); + } + } + assertTrue(flag_db_count == 1); + + +//CREATE TABLE + hive_docker """ use ${db1} """ + sql """ use ${db1} """ + List> tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + assertTrue(tbs.isEmpty()) + + + hive_docker """ create table ${tb1} (id int,name string) ;""" + hive_docker """ create table ${tb2} (id int,name string) ;""" + sleep(wait_time); + tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + int flag_tb_count = 0 ; + tbs.forEach { + logger.info("it[0] = " + it[0]) + if (it[0].toString() == "${tb1}") { + flag_tb_count ++; + logger.info(" ${tb1} exists ") + }else if (it[0].toString() == tb2) { + flag_tb_count ++; + logger.info(" ${tb2} exists ") + } + } + assertTrue(flag_tb_count == 2); + + +//ALTER TABLE + List> ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.isEmpty()) + + hive_docker """ insert into ${tb1} select 1,"xxx"; """ + sleep(wait_time); + ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 1) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + + + hive_docker """ insert into ${tb1} values( 2,"yyy"); """ + sleep(wait_time); + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "int") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + + hive_docker """ alter table ${tb1} change column id id bigint; """ + sleep(wait_time); + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + + + hive_docker """ alter table ${tb1} change column name new_name string; """ + sleep(wait_time); + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "new_name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + +//DROP TABLE + hive_docker """ drop table ${tb2} """ + sleep(wait_time); + tbs = sql """ show tables; """ + + logger.info(""" tbs = ${tbs}""") + + flag_tb_count = 0 ; + tbs.forEach { + if (it[0] == tb1) { + flag_tb_count ++; + } else if (it[0] == tb2) { + logger.info("exists ${tb1}") + assertTrue(false); + } + } + assertTrue(flag_tb_count == 1); + + + + hive_docker """ drop table ${tb1} """ + sleep(wait_time); + tbs = sql """ show tables; """ + + logger.info(""" tbs = ${tbs}""") + + tbs.forEach { + if (it[0] == tb1) { + logger.info("exists ${tb1}") + assertTrue(false); + } else if (it[0] == tb2) { + logger.info("exists ${tb2}") + assertTrue(false); + } + } + +//ADD PARTITION + + hive_docker """ use ${db1} """ + sql """ use ${db1} """ + + hive_docker """ CREATE TABLE ${partition_tb} ( + id INT, + name STRING, + age INT + ) + PARTITIONED BY (country STRING); """ + hive_docker """ + INSERT INTO TABLE ${partition_tb} PARTITION (country='USA') + VALUES (1, 'John Doe', 30), + (2, 'Jane Smith', 25);""" + + hive_docker """ + INSERT INTO TABLE ${partition_tb} PARTITION (country='India') + VALUES (3, 'Rahul Kumar', 28), + (4, 'Priya Singh', 24); + """ + sleep(wait_time); + ans = sql """ select * from ${partition_tb} order by id""" + logger.info("ans = ${ans}") + assertTrue(ans.size() == 4) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][3].toString() == "USA") + assertTrue(ans[1][3].toString() == "USA") + assertTrue(ans[3][0].toString() == "4") + assertTrue(ans[2][3].toString() == "India") + assertTrue(ans[3][3].toString() == "India") + + + List> pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + int flag_partition_count = 0 ; + pars.forEach { + if (it[0] == "country=India") { + flag_partition_count ++; + } else if (it[0] == "country=USA") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==2) + + + hive_docker """ + ALTER TABLE ${partition_tb} ADD PARTITION (country='Canada'); + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=USA") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + + +//ALTER PARTITION + hive_docker """ + alter table ${partition_tb} partition(country='USA') rename to partition(country='US') ; + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + +//DROP PARTITION + hive_docker """ + ALTER TABLE ${partition_tb} DROP PARTITION (country='Canada'); + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + flag_partition_count = 0 + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + logger.info("exists partition canada") + assertTrue(false); + } + } + assertTrue(flag_partition_count ==2) + + + sql """drop catalog if exists ${catalog_name}""" + } finally { + } + } +} + + + + From b9bb1b95c1b81bb693c2b7640e81cfb79a8d55c5 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Thu, 15 Aug 2024 01:53:52 +0800 Subject: [PATCH 3/9] catalog add parameters --- .../doris/datasource/ExternalDatabase.java | 5 +- .../datasource/hive/HMSExternalCatalog.java | 18 ++ .../hive/event/AddPartitionEvent.java | 1 - .../hive/event/MetastoreEventsProcessor.java | 9 +- .../doris/datasource/metacache/MetaCache.java | 9 +- .../property/constants/HMSProperties.java | 3 +- .../hive/test_hms_event_notification_2.groovy | 298 +++++++++++++++++- 7 files changed, 334 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index 4d1ef080a51045..2a7e42f769f948 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -451,6 +451,7 @@ public void gsonPostProcess() throws IOException { @Override public void unregisterTable(String tableName) { + makeSureInitialized(); if (LOG.isDebugEnabled()) { LOG.debug("create table [{}]", tableName); } @@ -459,6 +460,7 @@ public void unregisterTable(String tableName) { if (isInitialized()) { metaCache.invalidate(tableName); } + metaCache.idToNameRemove(Util.genIdByName(getQualifiedName(tableName))); } else { Long tableId = tableNameToId.remove(tableName); if (tableId == null) { @@ -480,6 +482,7 @@ public CatalogIf getCatalog() { // Only used for sync hive metastore event @Override public boolean registerTable(TableIf tableIf) { + makeSureInitialized(); long tableId = tableIf.getId(); String tableName = tableIf.getName(); if (LOG.isDebugEnabled()) { @@ -488,8 +491,8 @@ public boolean registerTable(TableIf tableIf) { if (extCatalog.getUseMetaCache().get()) { if (isInitialized()) { metaCache.updateCache(tableName, (T) tableIf); - metaCache.setIdToName(tableId, tableName); } + metaCache.setIdToName(Util.genIdByName(getQualifiedName(tableName)), tableName); } else { tableNameToId.put(tableName, tableId); idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index be9bf388adbd31..c2fb27bc496517 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -73,6 +73,10 @@ public class HMSExternalCatalog extends ExternalCatalog { @Getter private HadoopAuthenticator authenticator; + private int hmsEventsBatchSizePerRpc = -1; + private boolean enableHmsEventsIncrementalSync = false; + + @VisibleForTesting public HMSExternalCatalog() { catalogProperty = new CatalogProperty(null, null); @@ -101,6 +105,12 @@ public void checkProperties() throws DdlException { "The parameter " + FILE_META_CACHE_TTL_SECOND + " is wrong, value is " + fileMetaCacheTtlSecond); } + enableHmsEventsIncrementalSync = + catalogProperty.getOrDefault(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC, "false").equals("true"); + + hmsEventsBatchSizePerRpc = + Integer.valueOf(catalogProperty.getOrDefault(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC, "-1")); + // check the dfs.ha properties // 'dfs.nameservices'='your-nameservice', // 'dfs.ha.namenodes.your-nameservice'='nn1,nn2', @@ -266,4 +276,12 @@ public String getHiveMetastoreUris() { public String getHiveVersion() { return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, ""); } + + public int getHmsEventsBatchSizePerRpc() { + return hmsEventsBatchSizePerRpc; + } + + public boolean isEnableHmsEventsIncrementalSync() { + return enableHmsEventsIncrementalSync; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java index 1bd388f75af845..ffc7b95ff59aad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java @@ -105,7 +105,6 @@ protected void process() throws MetastoreNotificationException { infoLog("Partition list is empty. Ignoring this event."); return; } - // add meta cache Env.getCurrentEnv().getCatalogMgr() .addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, eventTime, true); } catch (DdlException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 9b6cfc455b1d3d..ffce17806b7399 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -115,6 +115,9 @@ private void realRun() { CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); if (catalog instanceof HMSExternalCatalog) { HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog; + if (!hmsExternalCatalog.isEnableHmsEventsIncrementalSync()) { + continue; + } try { List events = getNextHMSEvents(hmsExternalCatalog); if (!events.isEmpty()) { @@ -208,10 +211,12 @@ private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatal } LOG.info("(CYW)catalogname = {}, lastSyncedEventId = {}, currentEventId = {}", hmsExternalCatalog.getName(), lastSyncedEventId, currentEventId); + int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc() == -1 + ? Config.hms_events_batch_size_per_rpc + : hmsExternalCatalog.getHmsEventsBatchSizePerRpc(); try { NotificationEventResponse notificationEventResponse = - hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, - Config.hms_events_batch_size_per_rpc, null); + hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, batchSize, null); LOG.info("(CYW)catalogname = {} successs.event size = {} ", hmsExternalCatalog.getName(), notificationEventResponse.getEvents().size()); return notificationEventResponse; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java index c8e0a45012e564..475bca44aed67f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java @@ -90,8 +90,12 @@ public Optional getMetaObjById(long id) { return name == null ? Optional.empty() : getMetaObj(name, id); } - public void setIdToName(long id, String name) { - idToName.put(id, name); + public void setIdToName(long id, String tableName) { + idToName.put(id, tableName); + } + + public void idToNameRemove(long id) { + idToName.remove(id); } public void updateCache(String objName, T obj) { @@ -121,6 +125,7 @@ public void invalidate(String objName) { public void invalidateAll() { namesCache.invalidateAll(); metaObjCache.invalidateAll(); + idToName.clear(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java index 050ed1d5414e8e..87c9d3b02db703 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java @@ -28,5 +28,6 @@ public class HMSProperties { // required public static final String HIVE_METASTORE_URIS = "hive.metastore.uris"; public static final List REQUIRED_FIELDS = Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS); - + public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "enable_hms_events_incremental_sync"; + public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hms_events_batch_size_per_rpc"; } diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy index 34e2ffb4b4ae70..82e1a232044bfe 100644 --- a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy @@ -30,15 +30,28 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String catalog_name = "test_hms_event_notification_${hivePrefix}" + String catalog_name_2 = "test_hms_event_notification_${hivePrefix}_2" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - int wait_time = 2000; + int wait_time = 10000; sql """drop catalog if exists ${catalog_name}""" sql """create catalog if not exists ${catalog_name} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', - "use_meta_cache" = "false" + "use_meta_cache" = "false", + "enable_hms_events_incremental_sync" ="true", + "hms_events_batch_size_per_rpc" = "10000" );""" + + sql """drop catalog if exists ${catalog_name_2}""" + sql """create catalog if not exists ${catalog_name_2} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + "use_meta_cache" = "false", + "enable_hms_events_incremental_sync" ="true", + "hms_events_batch_size_per_rpc" = "100000" + );""" + sql """ switch ${catalog_name} """ String tb1 = """${catalog_name}_tb_1""" @@ -76,6 +89,20 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa } assertTrue(flag_db_count == 2); + sql """ switch ${catalog_name_2} """ + dbs = sql """ show databases """ + logger.info("result = " + dbs); + flag_db_count = 0 ; + dbs.forEach { + if (it[0] == db1) { + flag_db_count ++; + }else if (it[0] == db2) { + flag_db_count ++; + } + } + assertTrue(flag_db_count == 2); + + sql """ switch ${catalog_name} """ @@ -103,8 +130,19 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa loc_start = query_db2_location.indexOf("hdfs://") loc_end = query_db2_location.indexOf(".db") + 3 query_db2_location = query_db2_location.substring(loc_start, loc_end) + assertTrue(query_db2_location == new_db2_location); + + + sql """ switch ${catalog_name_2} """ + query_db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("query_db2_location = ${query_db2_location} ") + + loc_start = query_db2_location.indexOf("hdfs://") + loc_end = query_db2_location.indexOf(".db") + 3 + query_db2_location = query_db2_location.substring(loc_start, loc_end) assertTrue(query_db2_location == new_db2_location); + sql """ switch ${catalog_name} """ } @@ -124,6 +162,20 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa } assertTrue(flag_db_count == 1); + sql """ switch ${catalog_name_2} """ + dbs = sql """ show databases """ + logger.info("result = " + dbs); + flag_db_count = 0 ; + dbs.forEach { + if (it[0].toString() == db1) { + flag_db_count ++; + } else if (it[0].toString() == db2) { + logger.info(" exists ${db2}") + assertTrue(false); + } + } + assertTrue(flag_db_count == 1); + sql """ switch ${catalog_name} """ //CREATE TABLE hive_docker """ use ${db1} """ @@ -152,6 +204,26 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa assertTrue(flag_tb_count == 2); + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + flag_tb_count = 0 ; + tbs.forEach { + logger.info("it[0] = " + it[0]) + if (it[0].toString() == "${tb1}") { + flag_tb_count ++; + logger.info(" ${tb1} exists ") + }else if (it[0].toString() == tb2) { + flag_tb_count ++; + logger.info(" ${tb2} exists ") + } + } + assertTrue(flag_tb_count == 2); + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + //ALTER TABLE List> ans = sql """ select * from ${tb1} """ logger.info("ans = ${ans}") @@ -165,6 +237,16 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa assertTrue(ans[0][0].toString() == "1") assertTrue(ans[0][1].toString() == "xxx") + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 1) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ hive_docker """ insert into ${tb1} values( 2,"yyy"); """ sleep(wait_time); @@ -185,6 +267,29 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa assertTrue(ans[1][0].toString() == "name") assertTrue(ans[1][1].toString() == "text") + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "int") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + hive_docker """ alter table ${tb1} change column id id bigint; """ sleep(wait_time); ans = sql """ desc ${tb1} """ @@ -203,6 +308,27 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa assertTrue(ans[1][1].toString() == "yyy") + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + hive_docker """ alter table ${tb1} change column name new_name string; """ sleep(wait_time); @@ -221,6 +347,28 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa assertTrue(ans[1][0].toString() == "2") assertTrue(ans[1][1].toString() == "yyy") + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "new_name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + //DROP TABLE hive_docker """ drop table ${tb2} """ @@ -241,6 +389,26 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa assertTrue(flag_tb_count == 1); + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + tbs = sql """ show tables; """ + logger.info(""" tbs = ${tbs}""") + flag_tb_count = 0 ; + tbs.forEach { + if (it[0] == tb1) { + flag_tb_count ++; + } else if (it[0] == tb2) { + logger.info("exists ${tb2}") + assertTrue(false); + } + } + assertTrue(flag_tb_count == 1); + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + hive_docker """ drop table ${tb1} """ sleep(wait_time); @@ -258,6 +426,25 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa } } + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + tbs = sql """ show tables; """ + logger.info(""" tbs = ${tbs}""") + tbs.forEach { + if (it[0] == tb1) { + logger.info("exists ${tb1}") + assertTrue(false); + } else if (it[0] == tb2) { + logger.info("exists ${tb2}") + assertTrue(false); + } + } + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + //ADD PARTITION hive_docker """ use ${db1} """ @@ -290,6 +477,22 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa assertTrue(ans[2][3].toString() == "India") assertTrue(ans[3][3].toString() == "India") + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ select * from ${partition_tb} order by id""" + logger.info("ans = ${ans}") + assertTrue(ans.size() == 4) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][3].toString() == "USA") + assertTrue(ans[1][3].toString() == "USA") + assertTrue(ans[3][0].toString() == "4") + assertTrue(ans[2][3].toString() == "India") + assertTrue(ans[3][3].toString() == "India") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + List> pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ logger.info("pars = ${pars}") @@ -304,6 +507,26 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa } assertTrue(flag_partition_count ==2) + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + flag_partition_count = 0 ; + pars.forEach { + if (it[0] == "country=India") { + flag_partition_count ++; + } else if (it[0] == "country=USA") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==2) + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + hive_docker """ ALTER TABLE ${partition_tb} ADD PARTITION (country='Canada'); @@ -325,6 +548,30 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa assertTrue(flag_partition_count ==3) + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=USA") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + //ALTER PARTITION hive_docker """ alter table ${partition_tb} partition(country='USA') rename to partition(country='US') ; @@ -345,6 +592,30 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa } assertTrue(flag_partition_count ==3) + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + //DROP PARTITION hive_docker """ ALTER TABLE ${partition_tb} DROP PARTITION (country='Canada'); @@ -367,7 +638,30 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa assertTrue(flag_partition_count ==2) + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + flag_partition_count = 0 + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + logger.info("exists partition canada") + assertTrue(false); + } + } + assertTrue(flag_partition_count ==2) + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + sql """drop catalog if exists ${catalog_name}""" + sql """drop catalog if exists ${catalog_name_2}""" } finally { } } From f6c3085a53f421a1ad6d93713bd753fd605d943a Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Thu, 15 Aug 2024 15:11:43 +0800 Subject: [PATCH 4/9] remove useless log --- .../apache/doris/datasource/CatalogMgr.java | 14 - .../doris/datasource/ExternalDatabase.java | 6 +- .../datasource/hive/HMSExternalCatalog.java | 5 +- .../datasource/hive/event/InsertEvent.java | 11 - .../hive/event/MetastoreEventFactory.java | 5 +- .../hive/event/MetastoreEventsProcessor.java | 16 +- .../doris/datasource/metacache/MetaCache.java | 14 +- .../hive/test_hms_event_notification.groovy | 389 ++++++++++++++++++ ...s_event_notification_multi_catalog.groovy} | 6 +- 9 files changed, 413 insertions(+), 53 deletions(-) create mode 100644 regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy rename regression-test/suites/external_table_p0/hive/{test_hms_event_notification_2.groovy => test_hms_event_notification_multi_catalog.groovy} (99%) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index d251693d59520d..da92309828b916 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -655,13 +655,6 @@ public void registerExternalTableFromEvent(String dbName, String tableName, return; } - // TableIf table = db.getTableNullable(tableName); - // if (table != null) { - // if (!ignoreIfExists) { - // throw new DdlException("Table " + tableName + " has exist in db " + dbName); - // } - // return; - // } long tblId; HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog; if (hmsCatalog.getUseMetaCache().get()) { @@ -712,13 +705,6 @@ public void registerExternalDatabaseFromEvent(String dbName, String catalogName, if (!(catalog instanceof ExternalCatalog)) { throw new DdlException("Only support create ExternalCatalog databases"); } - // DatabaseIf db = catalog.getDbNullable(dbName); - // if (db != null) { - // if (!ignoreIfExists) { - // throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName()); - // } - // return; - // } HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog; long dbId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index 2a7e42f769f948..ab5921aac0759d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -458,9 +458,8 @@ public void unregisterTable(String tableName) { if (extCatalog.getUseMetaCache().get()) { if (isInitialized()) { - metaCache.invalidate(tableName); + metaCache.invalidate(tableName, Util.genIdByName(getQualifiedName(tableName))); } - metaCache.idToNameRemove(Util.genIdByName(getQualifiedName(tableName))); } else { Long tableId = tableNameToId.remove(tableName); if (tableId == null) { @@ -490,9 +489,8 @@ public boolean registerTable(TableIf tableIf) { } if (extCatalog.getUseMetaCache().get()) { if (isInitialized()) { - metaCache.updateCache(tableName, (T) tableIf); + metaCache.updateCache(tableName, (T) tableIf, Util.genIdByName(getQualifiedName(tableName))); } - metaCache.setIdToName(Util.genIdByName(getQualifiedName(tableName)), tableName); } else { tableNameToId.put(tableName, tableId); idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index c2fb27bc496517..e30ab1443a916c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -25,6 +25,7 @@ import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopAuthenticator; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; @@ -222,7 +223,7 @@ public void unregisterDatabase(String dbName) { } if (useMetaCache.get()) { if (isInitialized()) { - metaCache.invalidate(dbName); + metaCache.invalidate(dbName, Util.genIdByName(getQualifiedName(dbName))); } } else { Long dbId = dbNameToId.remove(dbName); @@ -243,7 +244,7 @@ public void registerDatabase(long dbId, String dbName) { ExternalDatabase db = buildDbForInit(dbName, dbId, logType); if (useMetaCache.get()) { if (isInitialized()) { - metaCache.updateCache(dbName, db); + metaCache.updateCache(dbName, db, Util.genIdByName(getQualifiedName(dbName))); } } else { dbNameToId.put(dbName, dbId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java index fbd45575b88577..7b76d4913d51f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -// import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import java.util.List; @@ -32,13 +31,11 @@ * MetastoreEvent for INSERT event type */ public class InsertEvent extends MetastoreTableEvent { - // private final Table hmsTbl; // for test public InsertEvent(long eventId, String catalogName, String dbName, String tblName) { super(eventId, catalogName, dbName, tblName, MetastoreEventType.INSERT); - // this.hmsTbl = null; } private InsertEvent(NotificationEvent event, String catalogName) { @@ -46,14 +43,6 @@ private InsertEvent(NotificationEvent event, String catalogName) { Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT)); Preconditions .checkNotNull(event.getMessage(), debugString("Event message is null")); - // try { - // InsertMessage insertMessage = - // MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) - // .getInsertMessage(event.getMessage()); - // // hmsTbl = Preconditions.checkNotNull(insertMessage.getTableObj()); - // } catch (Exception ex) { - // throw new MetastoreNotificationException(ex); - // } } protected static List getEvents(NotificationEvent event, String catalogName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java index c0bd4eaef22061..7f697cf9738e13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java @@ -46,7 +46,9 @@ public List transferNotificationEventToMetastoreEvents(Notificat String catalogName) { Preconditions.checkNotNull(event.getEventType()); MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType()); - LOG.info("event = {}", event.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("catalogName = {}, Event = {}", catalogName, event.toString()); + } switch (metastoreEventType) { case CREATE_TABLE: return CreateTableEvent.getEvents(event, catalogName); @@ -80,7 +82,6 @@ List getMetastoreEvents(List events, HMSExter for (NotificationEvent event : events) { metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, catalogName)); } - //try catch ??? List mergedEvents = mergeEvents(catalogName, metastoreEvents); if (Env.getCurrentEnv().isMaster()) { logMetaIdMappings(hmsExternalCatalog.getId(), events.get(events.size() - 1).getEventId(), mergedEvents); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index ffce17806b7399..91cf119eb251f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -128,6 +128,7 @@ private void realRun() { } catch (MetastoreNotificationFetchException e) { LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e); } catch (Exception ex) { + hmsExternalCatalog.onRefresh(true); LOG.warn("Failed to process hive metastore [{}] events .", hmsExternalCatalog.getName(), ex); } @@ -187,8 +188,8 @@ private void processEvents(List events, HMSExternalCatalog hm private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatalog hmsExternalCatalog) throws MetastoreNotificationFetchException { - long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog); //165 - long currentEventId = getCurrentHmsEventId(hmsExternalCatalog); //166 + long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog); + long currentEventId = getCurrentHmsEventId(hmsExternalCatalog); if (lastSyncedEventId < 0) { refreshCatalogForMaster(hmsExternalCatalog); // invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events @@ -209,16 +210,17 @@ private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatal LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName()); return null; } - LOG.info("(CYW)catalogname = {}, lastSyncedEventId = {}, currentEventId = {}", - hmsExternalCatalog.getName(), lastSyncedEventId, currentEventId); + int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc() == -1 ? Config.hms_events_batch_size_per_rpc : hmsExternalCatalog.getHmsEventsBatchSizePerRpc(); try { NotificationEventResponse notificationEventResponse = hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, batchSize, null); - LOG.info("(CYW)catalogname = {} successs.event size = {} ", - hmsExternalCatalog.getName(), notificationEventResponse.getEvents().size()); + LOG.info("CatalogName = {}, lastSyncedEventId = {}, currentEventId = {}," + + "batchSize = {}, getEventsSize = {}", hmsExternalCatalog.getName(), lastSyncedEventId, + currentEventId, batchSize, notificationEventResponse.getEvents().size()); + return notificationEventResponse; } catch (MetastoreNotificationFetchException e) { // Need a fallback to handle this because this error state can not be recovered until restarting FE @@ -226,7 +228,7 @@ private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatal && e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) { refreshCatalogForMaster(hmsExternalCatalog); // set lastSyncedEventId to currentEventId after refresh catalog successfully - updateLastSyncedEventId(hmsExternalCatalog, currentEventId); //TODO ??? may be not right + updateLastSyncedEventId(hmsExternalCatalog, currentEventId); LOG.warn("Notification events are missing, maybe an event can not be handled " + "or processing rate is too low, fallback to refresh the catalog"); return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java index 475bca44aed67f..6e4198186e82e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java @@ -90,15 +90,7 @@ public Optional getMetaObjById(long id) { return name == null ? Optional.empty() : getMetaObj(name, id); } - public void setIdToName(long id, String tableName) { - idToName.put(id, tableName); - } - - public void idToNameRemove(long id) { - idToName.remove(id); - } - - public void updateCache(String objName, T obj) { + public void updateCache(String objName, T obj, long id) { metaObjCache.put(objName, Optional.of(obj)); namesCache.asMap().compute("", (k, v) -> { if (v == null) { @@ -108,9 +100,10 @@ public void updateCache(String objName, T obj) { return v; } }); + idToName.put(id, objName); } - public void invalidate(String objName) { + public void invalidate(String objName, long id) { namesCache.asMap().compute("", (k, v) -> { if (v == null) { return Lists.newArrayList(); @@ -120,6 +113,7 @@ public void invalidate(String objName) { } }); metaObjCache.invalidate(objName); + idToName.remove(id); } public void invalidateAll() { diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy new file mode 100644 index 00000000000000..4167fb761a55dc --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy @@ -0,0 +1,389 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hms_event_notification", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + + for (String hivePrefix : [ "hive2","hive3"]) { + try { + setHivePrefix(hivePrefix) + hive_docker """ set hive.stats.autogather=false; """ + + + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalog_name = "test_hms_event_notification_${hivePrefix}" + String catalog_name_2 = "test_hms_event_notification_${hivePrefix}_2" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + int wait_time = 10000; + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + "use_meta_cache" = "false", + "enable_hms_events_incremental_sync" ="true", + "hms_events_batch_size_per_rpc" = "1000" + );""" + + sql """create catalog if not exists ${catalog_name_2} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + "use_meta_cache" = "false", + "enable_hms_events_incremental_sync" ="true" + );""" + + sql """ switch ${catalog_name} """ + + String tb1 = """${catalog_name}_tb_1""" + String tb2 = """${catalog_name}_tb_2""" + String db1 = "${catalog_name}_db_1"; + String db2 = "${catalog_name}_db_2"; + String partition_tb = "${catalog_name}_partition_tb"; + + try { + hive_docker """ use ${db1};""" + }catch (Exception e){ + } + + hive_docker """ drop table if exists ${tb1};""" + hive_docker """ drop table if exists ${tb2};""" + hive_docker """ drop table if exists ${partition_tb} """ + hive_docker """ drop database if exists ${db1};""" + hive_docker """ drop database if exists ${db2};""" + +//CREATE DATABASE + hive_docker """ create database ${db1};""" + hive_docker """ create database ${db2};""" + sleep(wait_time); + + List> dbs = sql """ show databases """ + logger.info("result = " + dbs); + + int flag_db_count = 0 ; + dbs.forEach { + if (it[0] == db1) { + flag_db_count ++; + }else if (it[0] == db2) { + flag_db_count ++; + } + } + assertTrue(flag_db_count == 2); + + + + +//ALTER DATABASE + if (hivePrefix == "hive3") { + String db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("db2 location = " + db2_location ) + + def loc_start = db2_location.indexOf("hdfs://") + def loc_end = db2_location.indexOf(".db") + 3 + db2_location = db2_location.substring(loc_start, loc_end) + logger.info("db2 location = " + db2_location ) + + String new_db2_location = db2_location.replace("warehouse", "new_warehouse_xxx") + logger.info("change db2 location to ${new_db2_location} ") + + logger.info(" alter database begin") + hive_docker """ ALTER DATABASE ${db2} SET LOCATION '${new_db2_location}'; """ + logger.info(" alter database end") + sleep(wait_time); + + String query_db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("query_db2_location = ${query_db2_location} ") + + loc_start = query_db2_location.indexOf("hdfs://") + loc_end = query_db2_location.indexOf(".db") + 3 + query_db2_location = query_db2_location.substring(loc_start, loc_end) + + assertTrue(query_db2_location == new_db2_location); + } + + +//DROP DATABASE + hive_docker """drop database ${db2}; """; + sleep(wait_time); + dbs = sql """ show databases """ + logger.info("result = " + dbs); + flag_db_count = 0 ; + dbs.forEach { + if (it[0].toString() == db1) { + flag_db_count ++; + } else if (it[0].toString() == db2) { + logger.info(" exists ${db2}") + assertTrue(false); + } + } + assertTrue(flag_db_count == 1); + + +//CREATE TABLE + hive_docker """ use ${db1} """ + sql """ use ${db1} """ + List> tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + assertTrue(tbs.isEmpty()) + + + hive_docker """ create table ${tb1} (id int,name string) ;""" + hive_docker """ create table ${tb2} (id int,name string) ;""" + sleep(wait_time); + tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + int flag_tb_count = 0 ; + tbs.forEach { + logger.info("it[0] = " + it[0]) + if (it[0].toString() == "${tb1}") { + flag_tb_count ++; + logger.info(" ${tb1} exists ") + }else if (it[0].toString() == tb2) { + flag_tb_count ++; + logger.info(" ${tb2} exists ") + } + } + assertTrue(flag_tb_count == 2); + + +//ALTER TABLE + List> ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.isEmpty()) + + hive_docker """ insert into ${tb1} select 1,"xxx"; """ + sleep(wait_time); + ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 1) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + + + hive_docker """ insert into ${tb1} values( 2,"yyy"); """ + sleep(wait_time); + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "int") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + + hive_docker """ alter table ${tb1} change column id id bigint; """ + sleep(wait_time); + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + + + hive_docker """ alter table ${tb1} change column name new_name string; """ + sleep(wait_time); + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "new_name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + +//DROP TABLE + hive_docker """ drop table ${tb2} """ + sleep(wait_time); + tbs = sql """ show tables; """ + + logger.info(""" tbs = ${tbs}""") + + flag_tb_count = 0 ; + tbs.forEach { + if (it[0] == tb1) { + flag_tb_count ++; + } else if (it[0] == tb2) { + logger.info("exists ${tb1}") + assertTrue(false); + } + } + assertTrue(flag_tb_count == 1); + + + + hive_docker """ drop table ${tb1} """ + sleep(wait_time); + tbs = sql """ show tables; """ + + logger.info(""" tbs = ${tbs}""") + + tbs.forEach { + if (it[0] == tb1) { + logger.info("exists ${tb1}") + assertTrue(false); + } else if (it[0] == tb2) { + logger.info("exists ${tb2}") + assertTrue(false); + } + } + +//ADD PARTITION + + hive_docker """ use ${db1} """ + sql """ use ${db1} """ + + hive_docker """ CREATE TABLE ${partition_tb} ( + id INT, + name STRING, + age INT + ) + PARTITIONED BY (country STRING); """ + hive_docker """ + INSERT INTO TABLE ${partition_tb} PARTITION (country='USA') + VALUES (1, 'John Doe', 30), + (2, 'Jane Smith', 25);""" + + hive_docker """ + INSERT INTO TABLE ${partition_tb} PARTITION (country='India') + VALUES (3, 'Rahul Kumar', 28), + (4, 'Priya Singh', 24); + """ + sleep(wait_time); + ans = sql """ select * from ${partition_tb} order by id""" + logger.info("ans = ${ans}") + assertTrue(ans.size() == 4) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][3].toString() == "USA") + assertTrue(ans[1][3].toString() == "USA") + assertTrue(ans[3][0].toString() == "4") + assertTrue(ans[2][3].toString() == "India") + assertTrue(ans[3][3].toString() == "India") + + + List> pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + int flag_partition_count = 0 ; + pars.forEach { + if (it[0] == "country=India") { + flag_partition_count ++; + } else if (it[0] == "country=USA") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==2) + + + hive_docker """ + ALTER TABLE ${partition_tb} ADD PARTITION (country='Canada'); + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=USA") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + + +//ALTER PARTITION + hive_docker """ + alter table ${partition_tb} partition(country='USA') rename to partition(country='US') ; + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + +//DROP PARTITION + hive_docker """ + ALTER TABLE ${partition_tb} DROP PARTITION (country='Canada'); + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + flag_partition_count = 0 + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + logger.info("exists partition canada") + assertTrue(false); + } + } + assertTrue(flag_partition_count ==2) + + + sql """drop catalog if exists ${catalog_name}""" + } finally { + } + } +} + + + + diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy similarity index 99% rename from regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy rename to regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy index 82e1a232044bfe..c058ed7a588e18 100644 --- a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_2.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_hms_event_notification_2", "p0,external,hive,external_docker,external_docker_hive") { +suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_docker,external_docker_hive") { String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) { logger.info("diable Hive test.") @@ -29,8 +29,8 @@ suite("test_hms_event_notification_2", "p0,external,hive,external_docker,externa String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") - String catalog_name = "test_hms_event_notification_${hivePrefix}" - String catalog_name_2 = "test_hms_event_notification_${hivePrefix}_2" + String catalog_name = "test_hms_event_notification_multi_catalog_${hivePrefix}" + String catalog_name_2 = "test_hms_event_notification_multi_catalog_${hivePrefix}_2" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") int wait_time = 10000; From cd12524eecbd81bdaf2bb0b92595d6767f5484a7 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Fri, 16 Aug 2024 11:32:26 +0800 Subject: [PATCH 5/9] fix case error. --- .../doris/datasource/hive/event/MetastoreEventsProcessor.java | 1 + .../suites/external_table_p0/hive/ddl/test_hive_ctas.groovy | 2 +- .../external_table_p0/hive/write/test_hive_write_insert.groovy | 2 +- .../hive/write/test_hive_write_partitions.groovy | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 91cf119eb251f3..cffb3982fb14e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -129,6 +129,7 @@ private void realRun() { LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e); } catch (Exception ex) { hmsExternalCatalog.onRefresh(true); + updateLastSyncedEventId(hmsExternalCatalog, -1); LOG.warn("Failed to process hive metastore [{}] events .", hmsExternalCatalog.getName(), ex); } diff --git a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy index deebb781f19903..265d200984ed58 100644 --- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy +++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy @@ -22,7 +22,7 @@ suite("test_hive_ctas", "p0,external,hive,external_docker,external_docker_hive") return; } - for (String hivePrefix : ["hive2", "hive3"]) { + for (String hivePrefix : [ "hive3"]) { def file_formats = ["parquet", "orc"] setHivePrefix(hivePrefix) def generateSrcDDLForCTAS = { String file_format, String catalog_name -> diff --git a/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy b/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy index 0b6fab86b2ba1a..087b797faaf9d2 100644 --- a/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy +++ b/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy @@ -880,7 +880,7 @@ INSERT INTO all_types_par_${format_compression}_${catalog_name}_q03 return; } - for (String hivePrefix : ["hive2", "hive3"]) { + for (String hivePrefix : ["hive3"]) { setHivePrefix(hivePrefix) try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") diff --git a/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy b/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy index cd0533d00d9d32..7e3f070636ec74 100644 --- a/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy +++ b/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy @@ -195,7 +195,7 @@ suite("test_hive_write_partitions", "p0,external,hive,external_docker,external_d return; } - for (String hivePrefix : ["hive2", "hive3"]) { + for (String hivePrefix : ["hive3"]) { setHivePrefix(hivePrefix) try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") From 1d3e37e2953314390d33009bbec00d1cb70064ec Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Fri, 16 Aug 2024 14:26:30 +0800 Subject: [PATCH 6/9] fix case. --- .../external_table_p0/hive/test_hms_event_notification.groovy | 2 ++ .../hive/test_hms_event_notification_multi_catalog.groovy | 1 + 2 files changed, 3 insertions(+) diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy index 4167fb761a55dc..fed2205db89036 100644 --- a/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy @@ -49,6 +49,8 @@ suite("test_hms_event_notification", "p0,external,hive,external_docker,external_ "use_meta_cache" = "false", "enable_hms_events_incremental_sync" ="true" );""" + + sleep(wait_time); sql """ switch ${catalog_name} """ diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy index c058ed7a588e18..c9ef63be550205 100644 --- a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy @@ -51,6 +51,7 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do "enable_hms_events_incremental_sync" ="true", "hms_events_batch_size_per_rpc" = "100000" );""" + sleep(wait_time); sql """ switch ${catalog_name} """ From 6f4acabace77a0d6ef24731b29e3ed387c9f9384 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Sat, 17 Aug 2024 23:56:05 +0800 Subject: [PATCH 7/9] fix comment --- .../doris/datasource/ExternalCatalog.java | 10 +++++----- .../doris/datasource/ExternalDatabase.java | 6 ++++-- .../datasource/hive/HMSExternalCatalog.java | 17 ++++++++++++----- .../datasource/hive/event/IgnoredEvent.java | 2 +- .../datasource/hive/event/MetastoreEvent.java | 12 ++++++++++++ .../hive/event/MetastoreEventsProcessor.java | 6 ++---- .../hive/test_hms_event_notification.groovy | 7 ++++--- ..._hms_event_notification_multi_catalog.groovy | 7 +++++-- 8 files changed, 45 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index d99ac76c7b9f13..a8170e837770e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -291,11 +291,11 @@ public void checkProperties() throws DdlException { } } - if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) { - LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name); - getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true"); - useMetaCache = Optional.of(true); - } + // if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) { + // LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name); + // getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true"); + // useMetaCache = Optional.of(true); + // } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index ab5921aac0759d..d653a5a178e484 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -492,8 +492,10 @@ public boolean registerTable(TableIf tableIf) { metaCache.updateCache(tableName, (T) tableIf, Util.genIdByName(getQualifiedName(tableName))); } } else { - tableNameToId.put(tableName, tableId); - idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog)); + if (!tableNameToId.containsKey(tableName)) { + tableNameToId.put(tableName, tableId); + idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog)); + } } setLastUpdateTime(System.currentTimeMillis()); return true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index e30ab1443a916c..5faf1f2bb6e723 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -105,12 +105,19 @@ public void checkProperties() throws DdlException { throw new DdlException( "The parameter " + FILE_META_CACHE_TTL_SECOND + " is wrong, value is " + fileMetaCacheTtlSecond); } + Map properties = catalogProperty.getProperties(); + if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) { + enableHmsEventsIncrementalSync = + properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true"); + } else { + enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync; + } - enableHmsEventsIncrementalSync = - catalogProperty.getOrDefault(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC, "false").equals("true"); - - hmsEventsBatchSizePerRpc = - Integer.valueOf(catalogProperty.getOrDefault(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC, "-1")); + if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) { + hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)); + } else { + hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc; + } // check the dfs.ha properties // 'dfs.nameservices'='your-nameservice', diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java index d504c2917f9d02..e7e6643e647b98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java @@ -28,7 +28,7 @@ */ public class IgnoredEvent extends MetastoreEvent { private IgnoredEvent(NotificationEvent event, String catalogName) { - super(event, catalogName); + super(event); } protected static List getEvents(NotificationEvent event, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java index 04b0ccab799b81..695dd57b215072 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -78,6 +78,18 @@ protected MetastoreEvent(long eventId, String catalogName, String dbName, this.event = null; } + // for IgnoredEvent + protected MetastoreEvent(NotificationEvent event) { + this.event = event; + this.metastoreNotificationEvent = event; + this.eventId = -1; + this.eventTime = -1L; + this.catalogName = null; + this.dbName = null; + this.tblName = null; + this.eventType = null; + } + protected MetastoreEvent(NotificationEvent event, String catalogName) { this.event = event; // Some events that we don't care about, dbName may be empty diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index cffb3982fb14e5..cbd0bfb5fa6fb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -128,7 +128,7 @@ private void realRun() { } catch (MetastoreNotificationFetchException e) { LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e); } catch (Exception ex) { - hmsExternalCatalog.onRefresh(true); + hmsExternalCatalog.onRefreshCache(true); updateLastSyncedEventId(hmsExternalCatalog, -1); LOG.warn("Failed to process hive metastore [{}] events .", hmsExternalCatalog.getName(), ex); @@ -212,9 +212,7 @@ private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatal return null; } - int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc() == -1 - ? Config.hms_events_batch_size_per_rpc - : hmsExternalCatalog.getHmsEventsBatchSizePerRpc(); + int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc(); try { NotificationEventResponse notificationEventResponse = hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, batchSize, null); diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy index fed2205db89036..c3019455bab951 100644 --- a/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy @@ -21,7 +21,7 @@ suite("test_hms_event_notification", "p0,external,hive,external_docker,external_ logger.info("diable Hive test.") return; } - + for (String useMetaCache : ["true","false"] ) { for (String hivePrefix : [ "hive2","hive3"]) { try { setHivePrefix(hivePrefix) @@ -38,7 +38,7 @@ suite("test_hms_event_notification", "p0,external,hive,external_docker,external_ sql """create catalog if not exists ${catalog_name} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', - "use_meta_cache" = "false", + "use_meta_cache" = "${useMetaCache}", "enable_hms_events_incremental_sync" ="true", "hms_events_batch_size_per_rpc" = "1000" );""" @@ -46,7 +46,7 @@ suite("test_hms_event_notification", "p0,external,hive,external_docker,external_ sql """create catalog if not exists ${catalog_name_2} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', - "use_meta_cache" = "false", + "use_meta_cache" = "${useMetaCache}", "enable_hms_events_incremental_sync" ="true" );""" @@ -383,6 +383,7 @@ suite("test_hms_event_notification", "p0,external,hive,external_docker,external_ sql """drop catalog if exists ${catalog_name}""" } finally { } + } } } diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy index c9ef63be550205..fa83ab24b2ab82 100644 --- a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy @@ -22,6 +22,8 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do return; } + for (String useMetaCache : ["true","false"] ) { + for (String hivePrefix : [ "hive2","hive3"]) { try { setHivePrefix(hivePrefix) @@ -38,7 +40,7 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do sql """create catalog if not exists ${catalog_name} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', - "use_meta_cache" = "false", + "use_meta_cache" = "${useMetaCache}", "enable_hms_events_incremental_sync" ="true", "hms_events_batch_size_per_rpc" = "10000" );""" @@ -47,7 +49,7 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do sql """create catalog if not exists ${catalog_name_2} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', - "use_meta_cache" = "false", + "use_meta_cache" = "${useMetaCache}", "enable_hms_events_incremental_sync" ="true", "hms_events_batch_size_per_rpc" = "100000" );""" @@ -666,6 +668,7 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do } finally { } } + } } From 2a5dfac1cc76d982042d4c324b1ac4430638a245 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Sun, 18 Aug 2024 12:01:34 +0800 Subject: [PATCH 8/9] fix fe.conf --- fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java | 5 ++--- regression-test/pipeline/external/conf/fe.conf | 2 -- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 7ad40455c8507b..f9e8a0350924d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1836,9 +1836,8 @@ protected void startNonMasterDaemonThreads() { domainResolver.start(); // fe disk updater feDiskUpdater.start(); - if (Config.enable_hms_events_incremental_sync) { - metastoreEventsProcessor.start(); - } + + metastoreEventsProcessor.start(); dnsCache.start(); diff --git a/regression-test/pipeline/external/conf/fe.conf b/regression-test/pipeline/external/conf/fe.conf index b8ed430128b3f1..df6fb86535c84d 100644 --- a/regression-test/pipeline/external/conf/fe.conf +++ b/regression-test/pipeline/external/conf/fe.conf @@ -96,9 +96,7 @@ auth_token = 5ff161c3-2c08-4079-b108-26c8850b6598 infodb_support_ext_catalog=true trino_connector_plugin_dir=/tmp/trino_connector/connectors -enable_hms_events_incremental_sync=true hms_events_polling_interval_ms=2000 -hms_events_batch_size_per_rpc=10000 KRB5_CONFIG=/keytabs/krb5.conf From b428fa7ff640ac63a2e450ea296ebbfb38f4ae08 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Wed, 21 Aug 2024 10:30:10 +0800 Subject: [PATCH 9/9] Parameter rename --- .../datasource/property/constants/HMSProperties.java | 4 ++-- .../hive/test_hms_event_notification.groovy | 6 +++--- .../hive/test_hms_event_notification_multi_catalog.groovy | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java index 87c9d3b02db703..81baf042faed37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java @@ -28,6 +28,6 @@ public class HMSProperties { // required public static final String HIVE_METASTORE_URIS = "hive.metastore.uris"; public static final List REQUIRED_FIELDS = Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS); - public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "enable_hms_events_incremental_sync"; - public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hms_events_batch_size_per_rpc"; + public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync"; + public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc"; } diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy index c3019455bab951..52724b807d38c7 100644 --- a/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy @@ -39,15 +39,15 @@ suite("test_hms_event_notification", "p0,external,hive,external_docker,external_ "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', "use_meta_cache" = "${useMetaCache}", - "enable_hms_events_incremental_sync" ="true", - "hms_events_batch_size_per_rpc" = "1000" + "hive.enable_hms_events_incremental_sync" ="true", + "hive.hms_events_batch_size_per_rpc" = "1000" );""" sql """create catalog if not exists ${catalog_name_2} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', "use_meta_cache" = "${useMetaCache}", - "enable_hms_events_incremental_sync" ="true" + "hive.enable_hms_events_incremental_sync" ="true" );""" sleep(wait_time); diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy index fa83ab24b2ab82..24c2ac3b7fb907 100644 --- a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy @@ -41,8 +41,8 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', "use_meta_cache" = "${useMetaCache}", - "enable_hms_events_incremental_sync" ="true", - "hms_events_batch_size_per_rpc" = "10000" + "hive.enable_hms_events_incremental_sync" ="true", + "hive.hms_events_batch_size_per_rpc" = "10000" );""" sql """drop catalog if exists ${catalog_name_2}""" @@ -50,8 +50,8 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', "use_meta_cache" = "${useMetaCache}", - "enable_hms_events_incremental_sync" ="true", - "hms_events_batch_size_per_rpc" = "100000" + "hive.enable_hms_events_incremental_sync" ="true", + "hive.hms_events_batch_size_per_rpc" = "100000" );""" sleep(wait_time);