diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 02497f6a044b91..b573e2c41eeb15 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -292,6 +292,9 @@ static std::string debug_info(const Request& req) { return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id()); } else if constexpr (is_any_v) { return fmt::format(" tablet_id={}", req.tablet_id()); + } else if constexpr (is_any_v) { + return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(), + req.tablet_id(), req.lock_id()); } else { static_assert(!sizeof(Request)); } @@ -1106,6 +1109,25 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in return st; } +Status CloudMetaMgr::remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, + int64_t initiator) { + VLOG_DEBUG << "remove_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id() + << ",lock_id:" << lock_id; + RemoveDeleteBitmapUpdateLockRequest req; + RemoveDeleteBitmapUpdateLockResponse res; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_tablet_id(tablet.tablet_id()); + req.set_lock_id(lock_id); + req.set_initiator(initiator); + auto st = retry_rpc("remove delete bitmap update lock", req, &res, + &MetaService_Stub::remove_delete_bitmap_update_lock); + if (!st.ok()) { + LOG(WARNING) << "remove delete bitmap update lock fail,tablet_id=" << tablet.tablet_id() + << " lock_id=" << lock_id << " st=" << st.to_string(); + } + return st; +} + Status CloudMetaMgr::remove_old_version_delete_bitmap( int64_t tablet_id, const std::vector>& to_delete) { diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index 79cdb3fd3d1f8c..0134469407a855 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -101,6 +101,9 @@ class CloudMetaMgr { Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, int64_t initiator); + Status remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, + int64_t initiator); + Status remove_old_version_delete_bitmap( int64_t tablet_id, const std::vector>& to_delete); diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index f053c1877fb525..f9b11aa85b4897 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -74,6 +74,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap("ms", "get_delete_bitmap" BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms", "get_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap("ms", "remove_delete_bitmap"); +BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock( + "ms", "remove_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance"); BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach"); BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", "reset_rl_progress"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index 2a9efe35302af4..4848ec4b456cef 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -173,6 +173,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_update_delete_bitmap; extern BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap; extern BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock; extern BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap; +extern BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock; extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance; diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 7adbc8ccf12aab..5f6aa2d25bc436 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -2095,6 +2095,58 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl } } +void MetaServiceImpl::remove_delete_bitmap_update_lock( + google::protobuf::RpcController* controller, + const RemoveDeleteBitmapUpdateLockRequest* request, + RemoveDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done) { + RPC_PREPROCESS(remove_delete_bitmap_update_lock); + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + if (cloud_unique_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "cloud unique id not set"; + return; + } + + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; + return; + } + + RPC_RATE_LIMIT(remove_delete_bitmap_update_lock) + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + msg = "failed to init txn"; + return; + } + if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, request->table_id(), + request->lock_id(), request->initiator())) { + LOG(WARNING) << "failed to check delete bitmap tablet lock" + << " table_id=" << request->table_id() << " tablet_id=" << request->tablet_id() + << " request lock_id=" << request->lock_id() + << " request initiator=" << request->initiator() << " msg " << msg; + return; + } + std::string lock_key = + meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1}); + txn->remove(lock_key); + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "failed to remove delete bitmap tablet lock , err=" << err; + msg = ss.str(); + return; + } + + LOG(INFO) << "remove delete bitmap table lock table_id=" << request->table_id() + << " tablet_id=" << request->tablet_id() << " lock_id=" << request->lock_id() + << ", key=" << hex(lock_key) << ", initiator=" << request->initiator(); +} + void MetaServiceImpl::remove_delete_bitmap(google::protobuf::RpcController* controller, const RemoveDeleteBitmapRequest* request, RemoveDeleteBitmapResponse* response, diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index f60d795949b366..55e8626b6bf01b 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -274,6 +274,11 @@ class MetaServiceImpl : public cloud::MetaService { RemoveDeleteBitmapResponse* response, ::google::protobuf::Closure* done) override; + void remove_delete_bitmap_update_lock(google::protobuf::RpcController* controller, + const RemoveDeleteBitmapUpdateLockRequest* request, + RemoveDeleteBitmapUpdateLockResponse* response, + ::google::protobuf::Closure* done) override; + // cloud control get cluster's status by this api void get_cluster_status(google::protobuf::RpcController* controller, const GetClusterStatusRequest* request, @@ -647,6 +652,14 @@ class MetaServiceProxy final : public MetaService { call_impl(&cloud::MetaService::remove_delete_bitmap, controller, request, response, done); } + void remove_delete_bitmap_update_lock(google::protobuf::RpcController* controller, + const RemoveDeleteBitmapUpdateLockRequest* request, + RemoveDeleteBitmapUpdateLockResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::remove_delete_bitmap_update_lock, controller, request, + response, done); + } + // cloud control get cluster's status by this api void get_cluster_status(google::protobuf::RpcController* controller, const GetClusterStatusRequest* request, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index c4d28fb3bc256c..904f3ec15d9c6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -345,6 +345,17 @@ public Cloud.GetDeleteBitmapUpdateLockResponse getDeleteBitmapUpdateLock( return blockingStub.getDeleteBitmapUpdateLock(request); } + public Cloud.RemoveDeleteBitmapUpdateLockResponse removeDeleteBitmapUpdateLock( + Cloud.RemoveDeleteBitmapUpdateLockRequest request) { + if (!request.hasCloudUniqueId()) { + Cloud.RemoveDeleteBitmapUpdateLockRequest.Builder builder = Cloud.RemoveDeleteBitmapUpdateLockRequest + .newBuilder(); + builder.mergeFrom(request); + return blockingStub.removeDeleteBitmapUpdateLock(builder.setCloudUniqueId(Config.cloud_unique_id).build()); + } + return blockingStub.removeDeleteBitmapUpdateLock(request); + } + public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) { if (!request.hasCloudUniqueId()) { Cloud.GetInstanceRequest.Builder builder = Cloud.GetInstanceRequest.newBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 6ed0eb81b781fb..d7f718e3ca46bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -335,6 +335,12 @@ public Cloud.GetDeleteBitmapUpdateLockResponse getDeleteBitmapUpdateLock( return w.executeRequest((client) -> client.getDeleteBitmapUpdateLock(request)); } + public Cloud.RemoveDeleteBitmapUpdateLockResponse removeDeleteBitmapUpdateLock( + Cloud.RemoveDeleteBitmapUpdateLockRequest request) + throws RpcException { + return w.executeRequest((client) -> client.removeDeleteBitmapUpdateLock(request)); + } + public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request) throws RpcException { return w.executeRequest((client) -> client.alterObjStoreInfo(request)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 131473470abf5d..9a7ee5bc86ec64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -56,6 +56,8 @@ import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest; import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse; +import org.apache.doris.cloud.proto.Cloud.RemoveDeleteBitmapUpdateLockRequest; +import org.apache.doris.cloud.proto.Cloud.RemoveDeleteBitmapUpdateLockResponse; import org.apache.doris.cloud.proto.Cloud.SubTxnInfo; import org.apache.doris.cloud.proto.Cloud.TableStatsPB; import org.apache.doris.cloud.proto.Cloud.TabletIndexPB; @@ -648,7 +650,13 @@ private void calcDeleteBitmapForMow(long dbId, List tableList, long t Map> backendToPartitionInfos = getCalcDeleteBitmapInfo( backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints); - sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos); + try { + sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos); + } catch (UserException e) { + LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + transactionId + ",exception=" + e.getMessage()); + removeDeleteBitmapUpdateLock(tableToPartitions, transactionId); + throw e; + } } private void getPartitionInfo(List tableList, @@ -869,6 +877,33 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo } } + private void removeDeleteBitmapUpdateLock(Map> tableToParttions, long transactionId) { + for (Map.Entry> entry : tableToParttions.entrySet()) { + RemoveDeleteBitmapUpdateLockRequest.Builder builder = RemoveDeleteBitmapUpdateLockRequest.newBuilder(); + builder.setTableId(entry.getKey()) + .setLockId(transactionId) + .setInitiator(-1); + final RemoveDeleteBitmapUpdateLockRequest request = builder.build(); + RemoveDeleteBitmapUpdateLockResponse response = null; + try { + response = MetaServiceProxy.getInstance().removeDeleteBitmapUpdateLock(request); + if (LOG.isDebugEnabled()) { + LOG.debug("remove delete bitmap lock, transactionId={}, Request: {}, Response: {}", + transactionId, request, response); + } + Preconditions.checkNotNull(response); + Preconditions.checkNotNull(response.getStatus()); + if (response.getStatus().getCode() != MetaServiceCode.OK) { + LOG.warn("remove delete bitmap lock failed, transactionId={}, response:{}", + transactionId, response); + } + } catch (Exception e) { + LOG.warn("ignore get delete bitmap lock exception, transactionId={}, exception={}", + transactionId, e); + } + } + } + private void sendCalcDeleteBitmaptask(long dbId, long transactionId, Map> backendToPartitionInfos) throws UserException { diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 8ae48851601b99..fb112237a82b5c 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1453,6 +1453,18 @@ message GetDeleteBitmapUpdateLockResponse { repeated int64 cumulative_points = 4; } +message RemoveDeleteBitmapUpdateLockRequest { + optional string cloud_unique_id = 1; // For auth + optional int64 table_id = 2; + optional int64 tablet_id = 3; + optional int64 lock_id = 4; + optional int64 initiator = 5; +} + +message RemoveDeleteBitmapUpdateLockResponse { + optional MetaServiceResponseStatus status = 1; +} + message GetRLTaskCommitAttachRequest { optional string cloud_unique_id = 1; // For auth optional int64 db_id = 2; @@ -1564,6 +1576,7 @@ service MetaService { rpc update_delete_bitmap(UpdateDeleteBitmapRequest) returns(UpdateDeleteBitmapResponse); rpc get_delete_bitmap(GetDeleteBitmapRequest) returns(GetDeleteBitmapResponse); rpc get_delete_bitmap_update_lock(GetDeleteBitmapUpdateLockRequest) returns(GetDeleteBitmapUpdateLockResponse); + rpc remove_delete_bitmap_update_lock(RemoveDeleteBitmapUpdateLockRequest) returns(RemoveDeleteBitmapUpdateLockResponse); rpc remove_delete_bitmap(RemoveDeleteBitmapRequest) returns(RemoveDeleteBitmapResponse); // routine load progress diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out new file mode 100644 index 00000000000000..b8b3ea3eccac14 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- + +-- !sql -- +5 e 90 +6 f 100 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy index 035a6307d46e20..1bd4d87742c6cb 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy @@ -176,7 +176,7 @@ suite("test_cloud_mow_broker_load_with_retry", "nonConcurrent") { ++i } } finally { - GetDebugPoint().disableDebugPointForAllFEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") sql "DROP TABLE IF EXISTS ${table};" GetDebugPoint().clearDebugPointsForAllBEs() } diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy new file mode 100644 index 00000000000000..122503b1611f11 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") { + if (!isCloudMode()) { + return + } + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def customFeConfig = [ + calculate_delete_bitmap_task_timeout_seconds: 2, + meta_service_rpc_retry_times : 5 + ] + + // store the original value + get_be_param("mow_stream_load_commit_retry_times") + // disable retry to make this problem more clear + set_be_param("mow_stream_load_commit_retry_times", "1") + + + def tableName = "tbl_basic" + setFeConfigTemporary(customFeConfig) { + try { + // create table + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + // this streamLoad will fail on calculate delete bitmap timeout + GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("Timeout")) + } + } + qt_sql """ select * from ${tableName} order by id""" + + // this streamLoad will success because of removing timeout simulation + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql """ select * from ${tableName} order by id""" + } finally { + reset_be_param("mow_stream_load_commit_retry_times") + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + sql "DROP TABLE IF EXISTS ${tableName};" + GetDebugPoint().clearDebugPointsForAllBEs() + } + + } +} \ No newline at end of file