Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,12 @@ public void refreshPartitions(String catalogName, String dbName, String tableNam
}

public void addToRefreshMap(long catalogId, Integer[] sec) {
LOG.info("Add catalog id={} to scheduled refresh map, interval={}s", catalogId, sec[0]);
refreshMap.put(catalogId, sec);
}

public void removeFromRefreshMap(long catalogId) {
LOG.info("Remove catalog (id={}) from scheduled refresh map", catalogId);
refreshMap.remove(catalogId);
}

Expand All @@ -300,20 +302,29 @@ public void run() {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalog != null) {
String catalogName = catalog.getName();
LOG.info("Scheduled refresh triggered for catalog {} (id={}), interval={}s, invalidCache=true",
catalogName, catalogId, original);
/**
* Now do not invoke
* {@link org.apache.doris.analysis.RefreshCatalogStmt#analyze(Analyzer)} is ok,
* because the default value of invalidCache is true.
* */
try {
Env.getCurrentEnv().getRefreshManager().handleRefreshCatalog(catalogName, true);
LOG.info("Scheduled refresh completed for catalog {} (id={}), next refresh in {}s",
catalogName, catalogId, original);
} catch (Exception e) {
LOG.warn("failed to refresh catalog {}", catalogName, e);
LOG.warn("Failed to execute scheduled refresh for catalog {} (id={})",
catalogName, catalogId, e);
}

// reset
timeGroup[1] = original;
refreshMap.put(catalogId, timeGroup);
} else {
LOG.warn("Scheduled refresh skipped: catalog id={} not found, removing from refresh map",
catalogId);
refreshMap.remove(catalogId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,14 +761,27 @@ public void dropExternalPartitions(String catalogName, String dbName, String tab
}

public void registerCatalogRefreshListener(Env env) {
int registeredCount = 0;
for (CatalogIf catalog : idToCatalog.values()) {
Map<String, String> properties = catalog.getProperties();
if (properties.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
Integer metadataRefreshIntervalSec = Integer.valueOf(properties.get(METADATA_REFRESH_INTERVAL_SEC));
Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec};
env.getRefreshManager().addToRefreshMap(catalog.getId(), sec);
try {
Integer metadataRefreshIntervalSec = Integer.valueOf(properties.get(METADATA_REFRESH_INTERVAL_SEC));
LOG.info("Registering scheduled refresh for catalog {} (id={}), type={}, interval={}s",
catalog.getName(), catalog.getId(), catalog.getType(), metadataRefreshIntervalSec);
Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec};
env.getRefreshManager().addToRefreshMap(catalog.getId(), sec);
registeredCount++;
} catch (Exception e) {
LOG.warn("Failed to register scheduled refresh for catalog {} (id={}), "
+ "invalid {} value: {}",
catalog.getName(), catalog.getId(), METADATA_REFRESH_INTERVAL_SEC,
properties.get(METADATA_REFRESH_INTERVAL_SEC), e);
}
}
}
LOG.info("Finished registering catalog refresh listeners, {} catalogs with scheduled refresh enabled",
registeredCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ public synchronized void resetToUninitialized(boolean invalidCache) {
* @param invalidCache
*/
public void onRefreshCache(boolean invalidCache) {
setLastUpdateTime(System.currentTimeMillis());
refreshMetaCacheOnly();
if (invalidCache) {
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,27 +137,28 @@ public void testRefreshCatalog() throws Exception {
@Test
public void testRefreshCatalogLastUpdateTime() throws Exception {
CatalogIf test2 = env.getCatalogMgr().getCatalog("test2");
// init is 0
// lastUpdateTime is set when catalog is created (via resetToUninitialized -> onRefreshCache)
long l1 = test2.getLastUpdateTime();
Assertions.assertTrue(l1 == 0);
Assertions.assertTrue(l1 > 0);
TestExternalTable table = (TestExternalTable) test2.getDbNullable("db1").getTable("tbl11").get();
// getDb() triggered init method
// getDb() triggered init method, but lastUpdateTime was already set
long l2 = test2.getLastUpdateTime();
Assertions.assertTrue(l2 > l1);
Assertions.assertTrue(l2 >= l1);
Assertions.assertFalse(table.isObjectCreated());
table.makeSureInitialized();
Assertions.assertTrue(table.isObjectCreated());

Thread.sleep(100); // wait a bit to ensure time difference
RefreshCatalogCommand refreshCatalogCommand = new RefreshCatalogCommand("test2", null);
Assertions.assertTrue(refreshCatalogCommand.isInvalidCache());
try {
refreshCatalogCommand.run(connectContext, null);
} catch (Exception e) {
// Do nothing
}
// not triggered init method
// refresh should update lastUpdateTime
long l3 = test2.getLastUpdateTime();
Assertions.assertTrue(l3 == l2);
Assertions.assertTrue(l3 > l2);
// the table will be recreated after refresh.
// so we need to get table again
table = (TestExternalTable) test2.getDbNullable("db1").getTable("tbl11").get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_jdbc_catalog_refresh_update_time", "p0,external,mysql,external_docker,external_docker_mysql") {
String enabled = context.config.otherConfigs.get("enableJdbcTest")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String s3_endpoint = getS3Endpoint()
String bucket = getS3BucketName()
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"

if (enabled == null || !enabled.equalsIgnoreCase("true")) {
return
}

// Helper function to get LastUpdateTime for a catalog
// Column index: 0=CatalogId, 1=Name, 2=Type, 3=IsCurrent, 4=CreateTime, 5=LastUpdateTime
def getLastUpdateTime = { catalogName ->
def catalogs = sql """show catalogs"""
for (row in catalogs) {
if (row[1] == catalogName) {
return row[5]
}
}
return null
}

String mysql_port = context.config.otherConfigs.get("mysql_57_port")
String catalog_name = "test_refresh_update_time_catalog"

sql """drop catalog if exists ${catalog_name}"""

// Test 1: Manual refresh should update LastUpdateTime
sql """create catalog if not exists ${catalog_name} properties(
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver"
);"""

// Get initial LastUpdateTime
String initialUpdateTime = getLastUpdateTime(catalog_name)
assertNotNull(initialUpdateTime, "Catalog ${catalog_name} should exist")

// Wait a bit to ensure time difference
Thread.sleep(2000)

// Manual refresh
sql """refresh catalog ${catalog_name}"""

// Get LastUpdateTime after refresh
String afterRefreshUpdateTime = getLastUpdateTime(catalog_name)

// Verify LastUpdateTime changed after manual refresh
assertTrue(afterRefreshUpdateTime != initialUpdateTime,
"LastUpdateTime should change after manual refresh. Initial: ${initialUpdateTime}, After: ${afterRefreshUpdateTime}")

sql """drop catalog if exists ${catalog_name}"""

// Test 2: Scheduled refresh should update LastUpdateTime
String scheduled_catalog_name = "test_scheduled_refresh_catalog"
sql """drop catalog if exists ${scheduled_catalog_name}"""

sql """create catalog if not exists ${scheduled_catalog_name} properties(
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"metadata_refresh_interval_sec" = "5"
);"""

// Get initial LastUpdateTime
String scheduledInitialTime = getLastUpdateTime(scheduled_catalog_name)
assertNotNull(scheduledInitialTime, "Catalog ${scheduled_catalog_name} should exist")

// Wait for scheduled refresh (interval is 5 seconds, wait 8 seconds to be safe)
Thread.sleep(8000)

// Get LastUpdateTime after scheduled refresh
String scheduledAfterRefreshTime = getLastUpdateTime(scheduled_catalog_name)

// Verify LastUpdateTime changed after scheduled refresh
assertTrue(scheduledAfterRefreshTime != scheduledInitialTime,
"LastUpdateTime should change after scheduled refresh. Initial: ${scheduledInitialTime}, After: ${scheduledAfterRefreshTime}")

sql """drop catalog if exists ${scheduled_catalog_name}"""
}
Loading