From 759e776f9b7e10875b8e51bb388c566463dde84f Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Wed, 17 Dec 2025 14:40:32 +0800 Subject: [PATCH] [fix](catalog) Fix lastUpdateTime not updated on refresh and add scheduled refresh logs (#58997) Fix `lastUpdateTime` not being updated when catalog is refreshed (manual or scheduled). Previously it was only set during initialization, now it reflects the actual last refresh time. Also added info logs for scheduled catalog refresh to improve observability: - Log when catalog is added/removed from refresh map - Log when scheduled refresh is triggered and completed - Log catalog registration on FE startup --- .../apache/doris/catalog/RefreshManager.java | 13 ++- .../apache/doris/datasource/CatalogMgr.java | 19 +++- .../doris/datasource/ExternalCatalog.java | 1 + .../doris/datasource/RefreshCatalogTest.java | 13 ++- ...st_jdbc_catalog_refresh_update_time.groovy | 104 ++++++++++++++++++ 5 files changed, 140 insertions(+), 10 deletions(-) create mode 100644 regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index 9738ee39355ada..2802055a8873c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -272,10 +272,12 @@ public void refreshPartitions(String catalogName, String dbName, String tableNam } public void addToRefreshMap(long catalogId, Integer[] sec) { + LOG.info("Add catalog id={} to scheduled refresh map, interval={}s", catalogId, sec[0]); refreshMap.put(catalogId, sec); } public void removeFromRefreshMap(long catalogId) { + LOG.info("Remove catalog (id={}) from scheduled refresh map", catalogId); refreshMap.remove(catalogId); } @@ -300,6 +302,8 @@ public void run() { CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); if (catalog != null) { String catalogName = catalog.getName(); + LOG.info("Scheduled refresh triggered for catalog {} (id={}), interval={}s, invalidCache=true", + catalogName, catalogId, original); /** * Now do not invoke * {@link org.apache.doris.analysis.RefreshCatalogStmt#analyze(Analyzer)} is ok, @@ -307,13 +311,20 @@ public void run() { * */ try { Env.getCurrentEnv().getRefreshManager().handleRefreshCatalog(catalogName, true); + LOG.info("Scheduled refresh completed for catalog {} (id={}), next refresh in {}s", + catalogName, catalogId, original); } catch (Exception e) { - LOG.warn("failed to refresh catalog {}", catalogName, e); + LOG.warn("Failed to execute scheduled refresh for catalog {} (id={})", + catalogName, catalogId, e); } // reset timeGroup[1] = original; refreshMap.put(catalogId, timeGroup); + } else { + LOG.warn("Scheduled refresh skipped: catalog id={} not found, removing from refresh map", + catalogId); + refreshMap.remove(catalogId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index f5b1d30eadbc27..692d32c5931afc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -761,14 +761,27 @@ public void dropExternalPartitions(String catalogName, String dbName, String tab } public void registerCatalogRefreshListener(Env env) { + int registeredCount = 0; for (CatalogIf catalog : idToCatalog.values()) { Map properties = catalog.getProperties(); if (properties.containsKey(METADATA_REFRESH_INTERVAL_SEC)) { - Integer metadataRefreshIntervalSec = Integer.valueOf(properties.get(METADATA_REFRESH_INTERVAL_SEC)); - Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec}; - env.getRefreshManager().addToRefreshMap(catalog.getId(), sec); + try { + Integer metadataRefreshIntervalSec = Integer.valueOf(properties.get(METADATA_REFRESH_INTERVAL_SEC)); + LOG.info("Registering scheduled refresh for catalog {} (id={}), type={}, interval={}s", + catalog.getName(), catalog.getId(), catalog.getType(), metadataRefreshIntervalSec); + Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec}; + env.getRefreshManager().addToRefreshMap(catalog.getId(), sec); + registeredCount++; + } catch (Exception e) { + LOG.warn("Failed to register scheduled refresh for catalog {} (id={}), " + + "invalid {} value: {}", + catalog.getName(), catalog.getId(), METADATA_REFRESH_INTERVAL_SEC, + properties.get(METADATA_REFRESH_INTERVAL_SEC), e); + } } } + LOG.info("Finished registering catalog refresh listeners, {} catalogs with scheduled refresh enabled", + registeredCount); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 6878d3e9044df3..629af2c2e26a36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -542,6 +542,7 @@ public synchronized void resetToUninitialized(boolean invalidCache) { * @param invalidCache */ public void onRefreshCache(boolean invalidCache) { + setLastUpdateTime(System.currentTimeMillis()); refreshMetaCacheOnly(); if (invalidCache) { Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java index 3431225788d13c..306ea1284dab2b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -137,17 +137,18 @@ public void testRefreshCatalog() throws Exception { @Test public void testRefreshCatalogLastUpdateTime() throws Exception { CatalogIf test2 = env.getCatalogMgr().getCatalog("test2"); - // init is 0 + // lastUpdateTime is set when catalog is created (via resetToUninitialized -> onRefreshCache) long l1 = test2.getLastUpdateTime(); - Assertions.assertTrue(l1 == 0); + Assertions.assertTrue(l1 > 0); TestExternalTable table = (TestExternalTable) test2.getDbNullable("db1").getTable("tbl11").get(); - // getDb() triggered init method + // getDb() triggered init method, but lastUpdateTime was already set long l2 = test2.getLastUpdateTime(); - Assertions.assertTrue(l2 > l1); + Assertions.assertTrue(l2 >= l1); Assertions.assertFalse(table.isObjectCreated()); table.makeSureInitialized(); Assertions.assertTrue(table.isObjectCreated()); + Thread.sleep(100); // wait a bit to ensure time difference RefreshCatalogCommand refreshCatalogCommand = new RefreshCatalogCommand("test2", null); Assertions.assertTrue(refreshCatalogCommand.isInvalidCache()); try { @@ -155,9 +156,9 @@ public void testRefreshCatalogLastUpdateTime() throws Exception { } catch (Exception e) { // Do nothing } - // not triggered init method + // refresh should update lastUpdateTime long l3 = test2.getLastUpdateTime(); - Assertions.assertTrue(l3 == l2); + Assertions.assertTrue(l3 > l2); // the table will be recreated after refresh. // so we need to get table again table = (TestExternalTable) test2.getDbNullable("db1").getTable("tbl11").get(); diff --git a/regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy b/regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy new file mode 100644 index 00000000000000..ff2d6cda08e6c6 --- /dev/null +++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_jdbc_catalog_refresh_update_time", "p0,external,mysql,external_docker,external_docker_mysql") { + String enabled = context.config.otherConfigs.get("enableJdbcTest") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + + // Helper function to get LastUpdateTime for a catalog + // Column index: 0=CatalogId, 1=Name, 2=Type, 3=IsCurrent, 4=CreateTime, 5=LastUpdateTime + def getLastUpdateTime = { catalogName -> + def catalogs = sql """show catalogs""" + for (row in catalogs) { + if (row[1] == catalogName) { + return row[5] + } + } + return null + } + + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String catalog_name = "test_refresh_update_time_catalog" + + sql """drop catalog if exists ${catalog_name}""" + + // Test 1: Manual refresh should update LastUpdateTime + sql """create catalog if not exists ${catalog_name} properties( + "type"="jdbc", + "user"="root", + "password"="123456", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver" + );""" + + // Get initial LastUpdateTime + String initialUpdateTime = getLastUpdateTime(catalog_name) + assertNotNull(initialUpdateTime, "Catalog ${catalog_name} should exist") + + // Wait a bit to ensure time difference + Thread.sleep(2000) + + // Manual refresh + sql """refresh catalog ${catalog_name}""" + + // Get LastUpdateTime after refresh + String afterRefreshUpdateTime = getLastUpdateTime(catalog_name) + + // Verify LastUpdateTime changed after manual refresh + assertTrue(afterRefreshUpdateTime != initialUpdateTime, + "LastUpdateTime should change after manual refresh. Initial: ${initialUpdateTime}, After: ${afterRefreshUpdateTime}") + + sql """drop catalog if exists ${catalog_name}""" + + // Test 2: Scheduled refresh should update LastUpdateTime + String scheduled_catalog_name = "test_scheduled_refresh_catalog" + sql """drop catalog if exists ${scheduled_catalog_name}""" + + sql """create catalog if not exists ${scheduled_catalog_name} properties( + "type"="jdbc", + "user"="root", + "password"="123456", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "metadata_refresh_interval_sec" = "5" + );""" + + // Get initial LastUpdateTime + String scheduledInitialTime = getLastUpdateTime(scheduled_catalog_name) + assertNotNull(scheduledInitialTime, "Catalog ${scheduled_catalog_name} should exist") + + // Wait for scheduled refresh (interval is 5 seconds, wait 8 seconds to be safe) + Thread.sleep(8000) + + // Get LastUpdateTime after scheduled refresh + String scheduledAfterRefreshTime = getLastUpdateTime(scheduled_catalog_name) + + // Verify LastUpdateTime changed after scheduled refresh + assertTrue(scheduledAfterRefreshTime != scheduledInitialTime, + "LastUpdateTime should change after scheduled refresh. Initial: ${scheduledInitialTime}, After: ${scheduledAfterRefreshTime}") + + sql """drop catalog if exists ${scheduled_catalog_name}""" +}