diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index 9738ee39355ada..2802055a8873c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -272,10 +272,12 @@ public void refreshPartitions(String catalogName, String dbName, String tableNam } public void addToRefreshMap(long catalogId, Integer[] sec) { + LOG.info("Add catalog id={} to scheduled refresh map, interval={}s", catalogId, sec[0]); refreshMap.put(catalogId, sec); } public void removeFromRefreshMap(long catalogId) { + LOG.info("Remove catalog (id={}) from scheduled refresh map", catalogId); refreshMap.remove(catalogId); } @@ -300,6 +302,8 @@ public void run() { CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); if (catalog != null) { String catalogName = catalog.getName(); + LOG.info("Scheduled refresh triggered for catalog {} (id={}), interval={}s, invalidCache=true", + catalogName, catalogId, original); /** * Now do not invoke * {@link org.apache.doris.analysis.RefreshCatalogStmt#analyze(Analyzer)} is ok, @@ -307,13 +311,20 @@ public void run() { * */ try { Env.getCurrentEnv().getRefreshManager().handleRefreshCatalog(catalogName, true); + LOG.info("Scheduled refresh completed for catalog {} (id={}), next refresh in {}s", + catalogName, catalogId, original); } catch (Exception e) { - LOG.warn("failed to refresh catalog {}", catalogName, e); + LOG.warn("Failed to execute scheduled refresh for catalog {} (id={})", + catalogName, catalogId, e); } // reset timeGroup[1] = original; refreshMap.put(catalogId, timeGroup); + } else { + LOG.warn("Scheduled refresh skipped: catalog id={} not found, removing from refresh map", + catalogId); + refreshMap.remove(catalogId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index f5b1d30eadbc27..692d32c5931afc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -761,14 +761,27 @@ public void dropExternalPartitions(String catalogName, String dbName, String tab } public void registerCatalogRefreshListener(Env env) { + int registeredCount = 0; for (CatalogIf catalog : idToCatalog.values()) { Map properties = catalog.getProperties(); if (properties.containsKey(METADATA_REFRESH_INTERVAL_SEC)) { - Integer metadataRefreshIntervalSec = Integer.valueOf(properties.get(METADATA_REFRESH_INTERVAL_SEC)); - Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec}; - env.getRefreshManager().addToRefreshMap(catalog.getId(), sec); + try { + Integer metadataRefreshIntervalSec = Integer.valueOf(properties.get(METADATA_REFRESH_INTERVAL_SEC)); + LOG.info("Registering scheduled refresh for catalog {} (id={}), type={}, interval={}s", + catalog.getName(), catalog.getId(), catalog.getType(), metadataRefreshIntervalSec); + Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec}; + env.getRefreshManager().addToRefreshMap(catalog.getId(), sec); + registeredCount++; + } catch (Exception e) { + LOG.warn("Failed to register scheduled refresh for catalog {} (id={}), " + + "invalid {} value: {}", + catalog.getName(), catalog.getId(), METADATA_REFRESH_INTERVAL_SEC, + properties.get(METADATA_REFRESH_INTERVAL_SEC), e); + } } } + LOG.info("Finished registering catalog refresh listeners, {} catalogs with scheduled refresh enabled", + registeredCount); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 2030fa8f4be687..3e347c9d353687 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -542,6 +542,7 @@ public synchronized void resetToUninitialized(boolean invalidCache) { * @param invalidCache */ public void onRefreshCache(boolean invalidCache) { + setLastUpdateTime(System.currentTimeMillis()); refreshMetaCacheOnly(); if (invalidCache) { Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java index 3431225788d13c..306ea1284dab2b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -137,17 +137,18 @@ public void testRefreshCatalog() throws Exception { @Test public void testRefreshCatalogLastUpdateTime() throws Exception { CatalogIf test2 = env.getCatalogMgr().getCatalog("test2"); - // init is 0 + // lastUpdateTime is set when catalog is created (via resetToUninitialized -> onRefreshCache) long l1 = test2.getLastUpdateTime(); - Assertions.assertTrue(l1 == 0); + Assertions.assertTrue(l1 > 0); TestExternalTable table = (TestExternalTable) test2.getDbNullable("db1").getTable("tbl11").get(); - // getDb() triggered init method + // getDb() triggered init method, but lastUpdateTime was already set long l2 = test2.getLastUpdateTime(); - Assertions.assertTrue(l2 > l1); + Assertions.assertTrue(l2 >= l1); Assertions.assertFalse(table.isObjectCreated()); table.makeSureInitialized(); Assertions.assertTrue(table.isObjectCreated()); + Thread.sleep(100); // wait a bit to ensure time difference RefreshCatalogCommand refreshCatalogCommand = new RefreshCatalogCommand("test2", null); Assertions.assertTrue(refreshCatalogCommand.isInvalidCache()); try { @@ -155,9 +156,9 @@ public void testRefreshCatalogLastUpdateTime() throws Exception { } catch (Exception e) { // Do nothing } - // not triggered init method + // refresh should update lastUpdateTime long l3 = test2.getLastUpdateTime(); - Assertions.assertTrue(l3 == l2); + Assertions.assertTrue(l3 > l2); // the table will be recreated after refresh. // so we need to get table again table = (TestExternalTable) test2.getDbNullable("db1").getTable("tbl11").get(); diff --git a/regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy b/regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy new file mode 100644 index 00000000000000..ff2d6cda08e6c6 --- /dev/null +++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_jdbc_catalog_refresh_update_time", "p0,external,mysql,external_docker,external_docker_mysql") { + String enabled = context.config.otherConfigs.get("enableJdbcTest") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + + // Helper function to get LastUpdateTime for a catalog + // Column index: 0=CatalogId, 1=Name, 2=Type, 3=IsCurrent, 4=CreateTime, 5=LastUpdateTime + def getLastUpdateTime = { catalogName -> + def catalogs = sql """show catalogs""" + for (row in catalogs) { + if (row[1] == catalogName) { + return row[5] + } + } + return null + } + + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String catalog_name = "test_refresh_update_time_catalog" + + sql """drop catalog if exists ${catalog_name}""" + + // Test 1: Manual refresh should update LastUpdateTime + sql """create catalog if not exists ${catalog_name} properties( + "type"="jdbc", + "user"="root", + "password"="123456", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver" + );""" + + // Get initial LastUpdateTime + String initialUpdateTime = getLastUpdateTime(catalog_name) + assertNotNull(initialUpdateTime, "Catalog ${catalog_name} should exist") + + // Wait a bit to ensure time difference + Thread.sleep(2000) + + // Manual refresh + sql """refresh catalog ${catalog_name}""" + + // Get LastUpdateTime after refresh + String afterRefreshUpdateTime = getLastUpdateTime(catalog_name) + + // Verify LastUpdateTime changed after manual refresh + assertTrue(afterRefreshUpdateTime != initialUpdateTime, + "LastUpdateTime should change after manual refresh. Initial: ${initialUpdateTime}, After: ${afterRefreshUpdateTime}") + + sql """drop catalog if exists ${catalog_name}""" + + // Test 2: Scheduled refresh should update LastUpdateTime + String scheduled_catalog_name = "test_scheduled_refresh_catalog" + sql """drop catalog if exists ${scheduled_catalog_name}""" + + sql """create catalog if not exists ${scheduled_catalog_name} properties( + "type"="jdbc", + "user"="root", + "password"="123456", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "metadata_refresh_interval_sec" = "5" + );""" + + // Get initial LastUpdateTime + String scheduledInitialTime = getLastUpdateTime(scheduled_catalog_name) + assertNotNull(scheduledInitialTime, "Catalog ${scheduled_catalog_name} should exist") + + // Wait for scheduled refresh (interval is 5 seconds, wait 8 seconds to be safe) + Thread.sleep(8000) + + // Get LastUpdateTime after scheduled refresh + String scheduledAfterRefreshTime = getLastUpdateTime(scheduled_catalog_name) + + // Verify LastUpdateTime changed after scheduled refresh + assertTrue(scheduledAfterRefreshTime != scheduledInitialTime, + "LastUpdateTime should change after scheduled refresh. Initial: ${scheduledInitialTime}, After: ${scheduledAfterRefreshTime}") + + sql """drop catalog if exists ${scheduled_catalog_name}""" +}