From 9badaf0d33b96a3328b469c2817162688292c361 Mon Sep 17 00:00:00 2001 From: deardeng Date: Wed, 19 Nov 2025 18:30:01 +0800 Subject: [PATCH 1/2] [fix](cloud) Fix the issue where it takes a long time to come alive on first boot --- cloud/src/meta-service/meta_service_txn.cpp | 6 ++ .../org/apache/doris/system/HeartbeatMgr.java | 32 ++++++-- .../test_cloud_add_backend_heartbeat.groovy | 76 +++++++++++++++++++ 3 files changed, 108 insertions(+), 6 deletions(-) create mode 100644 regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index afd51268e5615f..914b16a039230a 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -3796,6 +3796,12 @@ void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" << hex(end_info_key); TxnErrorCode err = txn_kv_->create_txn(&txn); + bool inject_error = false; + TEST_SYNC_POINT_CALLBACK("MetaServiceImpl::abort_txn_with_coordinator", &inject_error); + if (inject_error) { + std::this_thread::sleep_for(std::chrono::seconds(5)); + err = TxnErrorCode::TXN_TOO_OLD; + } if (err != TxnErrorCode::TXN_OK) { msg = "failed to create txn"; code = cast_as(err); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 8b0e351f3063f9..2fa2482d9da921 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -192,18 +192,21 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { boolean isChanged = be.handleHbResponse(hbResponse, isReplay); if (hbResponse.getStatus() == HbStatus.OK) { long newStartTime = be.getLastStartTime(); + // oldStartTime > 0 means it is not the first heartbeat if (!isReplay && Config.enable_abort_txn_by_checking_coordinator_be - && oldStartTime != newStartTime) { - Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart( - be.getId(), be.getHost(), newStartTime); + && oldStartTime != newStartTime && oldStartTime > 0) { + submitAbortTxnTask(() -> Env.getCurrentGlobalTransactionMgr() + .abortTxnWhenCoordinateBeRestart(be.getId(), be.getHost(), newStartTime), + "restart"); } } else { // invalid all connections cached in ClientPool ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort())); if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs() - >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) { - Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown( - be.getId(), be.getHost(), 100); + >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L + && be.getLastUpdateMs() > 0) { + submitAbortTxnTask(() -> Env.getCurrentGlobalTransactionMgr() + .abortTxnWhenCoordinateBeDown(be.getId(), be.getHost(), 100), "down"); } } return isChanged; @@ -230,6 +233,23 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { return false; } + private void submitAbortTxnTask(Runnable task, String reason) { + Thread thread = new Thread(() -> { + long start = System.currentTimeMillis(); + LOG.info("start abort txn task, reason={}, start_ts={}", reason, start); + try { + task.run(); + long duration = System.currentTimeMillis() - start; + LOG.info("finish abort txn task, reason={}, start_ts={}, cost_ms={}", reason, start, duration); + } catch (Exception e) { + long duration = System.currentTimeMillis() - start; + LOG.warn("abort txn task({}) failed, start_ts={}, cost_ms={}", reason, start, duration, e); + } + }, "abort-txn-thread"); + thread.setDaemon(true); + thread.start(); + } + // backend heartbeat private class BackendHeartbeatHandler implements Callable { private Backend backend; diff --git a/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy new file mode 100644 index 00000000000000..4e658438c20654 --- /dev/null +++ b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import groovy.json.JsonSlurper +import groovy.json.JsonOutput +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_cloud_add_backend_heartbeat", 'p0, docker') { + if (!isCloudMode()) { + return + } + + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + + // curl "175.43.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&op=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=[-1]" + def inject_to_ms_api = { msHttpPort, key, value, check_func -> + httpTest { + op "get" + endpoint msHttpPort + uri "/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=change_args&value=${value}" + check check_func + } + } + + def enable_ms_inject_api = { msHttpPort, check_func -> + httpTest { + op "get" + endpoint msHttpPort + uri "/MetaService/http/v1/injection_point?token=${token}&op=enable" + check check_func + } + } + + docker(options) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + logger.info("ms1 addr={}, port={}, ms endpoint={}", ms.host, ms.httpPort, msHttpPort) + // inject point, to change abort_txn_with_coordinator + inject_to_ms_api.call(msHttpPort, "MetaServiceImpl::abort_txn_with_coordinator", URLEncoder.encode('[true]', "UTF-8")) { + respCode, body -> + log.info("inject MetaServiceImpl::abort_txn_with_coordinator resp: ${body} ${respCode}".toString()) + } + enable_ms_inject_api.call(msHttpPort) { + respCode, body -> + log.info("enable inject resp: ${body} ${respCode}".toString()) + } + cluster.addBackend(10, "new_cluster") + + sql """admin set frontend config("cloud_tablet_rebalancer_interval_second"="3");""" + + cluster.restartBackends(); + + } + +} \ No newline at end of file From c6cbb88710bd0e2da9ebc473cd8d05c412ec42f1 Mon Sep 17 00:00:00 2001 From: deardeng Date: Thu, 20 Nov 2025 14:58:55 +0800 Subject: [PATCH 2/2] fix review --- cloud/src/meta-service/meta_service_txn.cpp | 6 --- .../CloudGlobalTransactionMgr.java | 8 ++++ .../org/apache/doris/system/HeartbeatMgr.java | 40 +++++++++++-------- .../test_cloud_add_backend_heartbeat.groovy | 32 ++------------- 4 files changed, 35 insertions(+), 51 deletions(-) diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 914b16a039230a..afd51268e5615f 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -3796,12 +3796,6 @@ void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" << hex(end_info_key); TxnErrorCode err = txn_kv_->create_txn(&txn); - bool inject_error = false; - TEST_SYNC_POINT_CALLBACK("MetaServiceImpl::abort_txn_with_coordinator", &inject_error); - if (inject_error) { - std::this_thread::sleep_for(std::chrono::seconds(5)); - err = TxnErrorCode::TXN_TOO_OLD; - } if (err != TxnErrorCode::TXN_OK) { msg = "failed to create txn"; code = cast_as(err); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 3cb313e02d6e03..00aa0f844f5244 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -2091,6 +2091,14 @@ public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordina response = MetaServiceProxy .getInstance().abortTxnWithCoordinator(request); LOG.info("AbortTxnWithCoordinatorResponse: {}", response); + if (DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart.slow")) { + LOG.info("debug point FE.abortTxnWhenCoordinateBeRestart.slow enabled, sleep 15s"); + try { + Thread.sleep(15 * 1000); + } catch (InterruptedException ie) { + LOG.info("error ", ie); + } + } } catch (RpcException e) { LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 2fa2482d9da921..077db66712a611 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -78,6 +78,7 @@ public class HeartbeatMgr extends MasterDaemon { private final ExecutorService executor; private SystemInfoService nodeMgr; private HeartbeatFlags heartbeatFlags; + private final ExecutorService abortTxnExecutor; private static volatile AtomicReference masterInfo = new AtomicReference<>(); @@ -86,6 +87,8 @@ public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) { this.nodeMgr = nodeMgr; this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num, Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric); + this.abortTxnExecutor = ThreadPoolManager.newDaemonFixedThreadPool(1, + Config.heartbeat_mgr_blocking_queue_size, "abort-txn-executor", needRegisterMetric); this.heartbeatFlags = new HeartbeatFlags(); } @@ -195,7 +198,7 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { // oldStartTime > 0 means it is not the first heartbeat if (!isReplay && Config.enable_abort_txn_by_checking_coordinator_be && oldStartTime != newStartTime && oldStartTime > 0) { - submitAbortTxnTask(() -> Env.getCurrentGlobalTransactionMgr() + submitAbortTxnTaskByExecutor(() -> Env.getCurrentGlobalTransactionMgr() .abortTxnWhenCoordinateBeRestart(be.getId(), be.getHost(), newStartTime), "restart"); } @@ -205,7 +208,7 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs() >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L && be.getLastUpdateMs() > 0) { - submitAbortTxnTask(() -> Env.getCurrentGlobalTransactionMgr() + submitAbortTxnTaskByExecutor(() -> Env.getCurrentGlobalTransactionMgr() .abortTxnWhenCoordinateBeDown(be.getId(), be.getHost(), 100), "down"); } } @@ -233,21 +236,24 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { return false; } - private void submitAbortTxnTask(Runnable task, String reason) { - Thread thread = new Thread(() -> { - long start = System.currentTimeMillis(); - LOG.info("start abort txn task, reason={}, start_ts={}", reason, start); - try { - task.run(); - long duration = System.currentTimeMillis() - start; - LOG.info("finish abort txn task, reason={}, start_ts={}, cost_ms={}", reason, start, duration); - } catch (Exception e) { - long duration = System.currentTimeMillis() - start; - LOG.warn("abort txn task({}) failed, start_ts={}, cost_ms={}", reason, start, duration, e); - } - }, "abort-txn-thread"); - thread.setDaemon(true); - thread.start(); + private void submitAbortTxnTaskByExecutor(Runnable task, String reason) { + long start = System.currentTimeMillis(); + try { + abortTxnExecutor.submit(() -> { + LOG.info("start abort txn task, reason={}, start_ts={}", reason, start); + try { + task.run(); + long duration = System.currentTimeMillis() - start; + LOG.info("finish abort txn task, reason={}, start_ts={}, cost_ms={}", reason, start, duration); + } catch (Exception e) { + long duration = System.currentTimeMillis() - start; + LOG.warn("abort txn task({}) failed, start_ts={}, cost_ms={}", reason, start, duration, e); + } + }); + } catch (Exception e) { + long duration = System.currentTimeMillis() - start; + LOG.warn("failed to submit abort txn task({}), start_ts={}, cost_ms={}", reason, start, duration, e); + } } // backend heartbeat diff --git a/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy index 4e658438c20654..bd1e5b9d0b5df3 100644 --- a/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy +++ b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy @@ -31,40 +31,16 @@ suite("test_cloud_add_backend_heartbeat", 'p0, docker') { ] options.setFeNum(1) options.setBeNum(1) + options.enableDebugPoints() options.cloudMode = true - // curl "175.43.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&op=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=[-1]" - def inject_to_ms_api = { msHttpPort, key, value, check_func -> - httpTest { - op "get" - endpoint msHttpPort - uri "/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=change_args&value=${value}" - check check_func - } - } - - def enable_ms_inject_api = { msHttpPort, check_func -> - httpTest { - op "get" - endpoint msHttpPort - uri "/MetaService/http/v1/injection_point?token=${token}&op=enable" - check check_func - } - } - docker(options) { def ms = cluster.getAllMetaservices().get(0) def msHttpPort = ms.host + ":" + ms.httpPort logger.info("ms1 addr={}, port={}, ms endpoint={}", ms.host, ms.httpPort, msHttpPort) - // inject point, to change abort_txn_with_coordinator - inject_to_ms_api.call(msHttpPort, "MetaServiceImpl::abort_txn_with_coordinator", URLEncoder.encode('[true]', "UTF-8")) { - respCode, body -> - log.info("inject MetaServiceImpl::abort_txn_with_coordinator resp: ${body} ${respCode}".toString()) - } - enable_ms_inject_api.call(msHttpPort) { - respCode, body -> - log.info("enable inject resp: ${body} ${respCode}".toString()) - } + + GetDebugPoint().enableDebugPointForAllFEs("FE.abortTxnWhenCoordinateBeRestart.slow") + cluster.addBackend(10, "new_cluster") sql """admin set frontend config("cloud_tablet_rebalancer_interval_second"="3");"""