From 03f0c849bc530a5348495b6575b1a661e294090f Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 7 Aug 2024 12:19:06 +0800 Subject: [PATCH 01/10] fix --- be/src/agent/task_worker_pool.cpp | 1 + .../main/java/org/apache/doris/master/MasterImpl.java | 9 +++++++++ .../java/org/apache/doris/task/CalcDeleteBitmapTask.java | 4 ++++ gensrc/thrift/MasterService.thrift | 2 ++ 4 files changed, 16 insertions(+) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 7bbd602f571ede..8a6378794e92d5 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -2052,6 +2052,7 @@ void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskReq finish_task_request.__set_signature(req.signature); finish_task_request.__set_report_version(s_report_version); finish_task_request.__set_error_tablet_ids(error_tablet_ids); + finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 0eef0c684d6062..ed92442c7ee935 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -671,6 +671,15 @@ private void finishCalcDeleteBitmap(AgentTask task, TFinishTaskRequest request) "backend: " + task.getBackendId() + ", error_tablet_size: " + request.getErrorTabletIdsSize() + ", err_msg: " + request.getTaskStatus().getErrorMsgs().toString()); + } else if (request.isSetRespPartitions() + && calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) { + LOG.warn("get staled response from backend: {}, report version: {}. calcDeleteBitmapTask's" + + "partitionInfos: {}. response's partitionInfos: {}", task.getBackendId(), + request.getReportVersion(), + calcDeleteBitmapTask.getCalcDeleteBimapPartitionInfos().toString(), + request.getRespPartitions().toString()); + calcDeleteBitmapTask.countDownToZero(TStatusCode.INTERNAL_ERROR, "get staled response from backend " + + task.getBackendId() + ", report version: " + request.getReportVersion()); } else { calcDeleteBitmapTask.countDownLatch(task.getBackendId(), calcDeleteBitmapTask.getTransactionId()); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java index 4188cf61849a91..49a653c7a32c26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java @@ -79,6 +79,10 @@ public void countDownToZero(TStatusCode code, String errMsg) { } } + public boolean isFinishRequestStale(List respPartitionInfos) { + return !respPartitionInfos.equals(partitionInfos); + } + public void setLatch(MarkedCountDownLatch latch) { this.latch = latch; } diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 1db7a109f55078..ecedf0ee1afad5 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -72,6 +72,8 @@ struct TFinishTaskRequest { 17: optional map succ_tablets 18: optional map table_id_to_delta_num_rows 19: optional map> table_id_to_tablet_id_to_delta_num_rows + // for Cloud mow table only, used by FE to check if the response is for the latest request + 20: optional list resp_partitions; } struct TTablet { From e59c6419ea3588377ca4de227e1c3a046e95d02d Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 7 Aug 2024 19:52:13 +0800 Subject: [PATCH 02/10] use DELETE_BITMAP_LOCK_ERROR as error code to let coordinator retry and add cases --- be/src/olap/base_tablet.cpp | 15 +++ .../org/apache/doris/master/MasterImpl.java | 6 +- ...d_mow_stale_resp_load_load_conflict.groovy | 97 +++++++++++++++++++ 3 files changed, 116 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 4ca36684383939..7655b349957301 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1296,6 +1296,21 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf } } + DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.enable_spin_wait", { + auto token = dp->param("token", "invalid_token"); + while (DebugPoints::instance()->is_enable("BaseTablet::update_delete_bitmap.block")) { + auto block_dp = DebugPoints::instance()->get_debug_point( + "BaseTablet::update_delete_bitmap.block"); + if (block_dp) { + auto wait_token = block_dp->param("wait_token", ""); + if (wait_token != token) { + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }); + if (!rowsets_skip_alignment.empty()) { auto token = self->calc_delete_bitmap_executor()->create_token(); // set rowset_writer to nullptr to skip the alignment process diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index ed92442c7ee935..4e01f3a5058774 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -678,8 +678,10 @@ private void finishCalcDeleteBitmap(AgentTask task, TFinishTaskRequest request) request.getReportVersion(), calcDeleteBitmapTask.getCalcDeleteBimapPartitionInfos().toString(), request.getRespPartitions().toString()); - calcDeleteBitmapTask.countDownToZero(TStatusCode.INTERNAL_ERROR, "get staled response from backend " - + task.getBackendId() + ", report version: " + request.getReportVersion()); + // DELETE_BITMAP_LOCK_ERROR will be retried + calcDeleteBitmapTask.countDownToZero(TStatusCode.DELETE_BITMAP_LOCK_ERROR, + "get staled response from backend " + task.getBackendId() + ", report version: " + + request.getReportVersion()); } else { calcDeleteBitmapTask.countDownLatch(task.getBackendId(), calcDeleteBitmapTask.getTransactionId()); if (LOG.isDebugEnabled()) { diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy new file mode 100644 index 00000000000000..bf3970601a7c55 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_stale_resp_load_load_conflict", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + def table1 = "test_cloud_mow_stale_resp_load_load_conflict" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1);" + sql "insert into ${table1} values(2,2,2);" + sql "insert into ${table1} values(3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the first load + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block", [wait_token: "token1"]) + + // the first load + // streamLoad { + // table "${table1}" + // set 'column_separator', ',' + // set 'format', 'csv' + // set 'columns', 'k1,c1,c2' + // file 'cloud_mow_stale_resp.csv' + // time 100000 // 100s + // } + t1 = Thread.start { + sql "insert into ${table1} values(1,999,999),(2,888,888);" + } + + // wait util the first load's delete bitmap update lock expired + // to ensure that the second load can take the delete bitmap update lock + // Config.delete_bitmap_lock_expiration_seconds = 10s + Thread.sleep(11 * 1000) + + // the second load + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token2"]) + Thread.sleep(200) + + sql "insert into ${table1}(k1,c1,c2) values(1,666,666),(2,555,555);" + + order_qt_sql "select * from ${table1};" + + // GetDebugPoint().enableDebugPointForAllFEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) + + // keep waiting to let the coordinator BE to retry to commit the first load's txn + Thread.sleep(12 * 1000) + + // let the first partial update load finish + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + t1.join() + + Thread.sleep(1000) + + order_qt_sql "select * from ${table1};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${table1};" +} From a28341ef6b108a7b34cd83e9cec25ccacd19b596 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 7 Aug 2024 22:22:08 +0800 Subject: [PATCH 03/10] [DOING] fix retry not re-calculate the delete bitmap --- .../cloud_engine_calc_delete_bitmap_task.cpp | 10 ++++++++-- be/src/cloud/cloud_tablet.cpp | 5 +++-- be/src/cloud/cloud_txn_delete_bitmap_cache.cpp | 17 +++++++++++++---- be/src/cloud/cloud_txn_delete_bitmap_cache.h | 7 +++++-- be/src/olap/txn_manager.h | 1 + ...oud_mow_stale_resp_load_load_conflict.groovy | 17 +++++------------ 6 files changed, 35 insertions(+), 22 deletions(-) diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 5d1a957d14df19..987ed4a00adcac 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -186,9 +186,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { std::shared_ptr partial_update_info; std::shared_ptr publish_status; int64_t txn_expiration; + int64_t previous_pubished_version {-1}; Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info( _transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration, - &partial_update_info, &publish_status); + &partial_update_info, &publish_status, &previous_pubished_version); if (status != Status::OK()) { LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << status; @@ -204,8 +205,13 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { txn_info.rowset_ids = rowset_ids; txn_info.partial_update_info = partial_update_info; txn_info.publish_status = publish_status; + txn_info.publish_version = _version; auto update_delete_bitmap_time_us = 0; - if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED)) { + if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) && + _version == previous_pubished_version) { + // if _version > previous_pubished_version, it means that this is a retry and there are + // compaction or other loads finished successfully on the same tablet. So the previous publish + // is stale and we should re-calculate the delete bitmap LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id << ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap."; } else { diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 17ec1fe22b0d85..ff08899e73bc89 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -648,8 +648,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap( *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get())); - _engine.txn_delete_bitmap_cache().update_tablet_txn_info( - txn_id, tablet_id(), new_delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED); + _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), new_delete_bitmap, + cur_rowset_ids, PublishStatus::SUCCEED, + txn_info->publish_version); return Status::OK(); } diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 583992e76f7aba..2a43919519f3b8 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -27,6 +27,7 @@ #include "cpp/sync_point.h" #include "olap/olap_common.h" #include "olap/tablet_meta.h" +#include "olap/txn_manager.h" namespace doris { @@ -54,7 +55,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, std::shared_ptr* partial_update_info, - std::shared_ptr* publish_status) { + std::shared_ptr* publish_status, int64_t* previous_published_version) { { std::shared_lock rlock(_rwlock); TxnKey key(transaction_id, tablet_id); @@ -68,6 +69,9 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( *txn_expiration = iter->second.txn_expiration; *partial_update_info = iter->second.partial_update_info; *publish_status = iter->second.publish_status; + if (iter->second.publish_version != -1) { + *previous_published_version = iter->second.publish_version; + } } RETURN_IF_ERROR( get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr)); @@ -153,12 +157,17 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, - PublishStatus publish_status) { + PublishStatus publish_status, + int64_t publish_version) { { std::unique_lock wlock(_rwlock); TxnKey txn_key(transaction_id, tablet_id); - CHECK(_txn_map.count(txn_key) > 0); - *(_txn_map[txn_key].publish_status.get()) = publish_status; + CHECK(_txn_map.contains(txn_key)); + TxnVal& txn_val = _txn_map[txn_key]; + *(txn_val.publish_status) = publish_status; + if (publish_status == PublishStatus::SUCCEED && publish_version != -1) { + txn_val.publish_version = publish_version; + } } std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id); CacheKey key(key_str); diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index 5012db6b8e5bf3..0859c94ab81034 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -42,7 +42,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, std::shared_ptr* partial_update_info, - std::shared_ptr* publish_status); + std::shared_ptr* publish_status, + int64_t* previous_published_version); void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, @@ -52,7 +53,7 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, - PublishStatus publish_status); + PublishStatus publish_status, int64_t publish_version = -1); void remove_expired_tablet_txn_info(); @@ -88,6 +89,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { int64_t txn_expiration; std::shared_ptr partial_update_info; std::shared_ptr publish_status = nullptr; + // used to determine if the retry needs to re-calculate the delete bitmap + int64_t publish_version {-1}; TxnVal() : txn_expiration(0) {}; TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_, std::shared_ptr partial_update_info_, diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 5a0a74c76a2825..9b733566b40a10 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -75,6 +75,7 @@ struct TabletTxnInfo { bool ingest {false}; std::shared_ptr partial_update_info; std::shared_ptr publish_status; + int64_t publish_version; TxnState state {TxnState::PREPARED}; TabletTxnInfo() = default; diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy index bf3970601a7c55..858c2fdcb05c5b 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy @@ -48,14 +48,6 @@ suite("test_cloud_mow_stale_resp_load_load_conflict", "nonConcurrent") { GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block", [wait_token: "token1"]) // the first load - // streamLoad { - // table "${table1}" - // set 'column_separator', ',' - // set 'format', 'csv' - // set 'columns', 'k1,c1,c2' - // file 'cloud_mow_stale_resp.csv' - // time 100000 // 100s - // } t1 = Thread.start { sql "insert into ${table1} values(1,999,999),(2,888,888);" } @@ -73,10 +65,11 @@ suite("test_cloud_mow_stale_resp_load_load_conflict", "nonConcurrent") { order_qt_sql "select * from ${table1};" - // GetDebugPoint().enableDebugPointForAllFEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) + // GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) - // keep waiting to let the coordinator BE to retry to commit the first load's txn - Thread.sleep(12 * 1000) + // keep waiting util the delete bitmap calculation timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s) + // and the coordinator BE will retry to commit the first load's txn + Thread.sleep(15 * 1000) // let the first partial update load finish GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") @@ -93,5 +86,5 @@ suite("test_cloud_mow_stale_resp_load_load_conflict", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllBEs() } - sql "DROP TABLE IF EXISTS ${table1};" + // sql "DROP TABLE IF EXISTS ${table1};" } From 5ac9fa824af96b44906d050d6d7eeac6477f62a2 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 8 Aug 2024 00:19:29 +0800 Subject: [PATCH 04/10] add check for compaction stats on BE and store delete bitmap with sentinel mark in _ms_base_compaction_cnt --- .../cloud_engine_calc_delete_bitmap_task.cpp | 14 +++++++--- be/src/cloud/cloud_tablet.cpp | 5 +++- .../cloud/cloud_txn_delete_bitmap_cache.cpp | 27 +++++++++++-------- be/src/cloud/cloud_txn_delete_bitmap_cache.h | 11 ++++++-- ...loud_mow_stale_resp_load_load_conflict.out | 16 +++++++++++ 5 files changed, 56 insertions(+), 17 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 987ed4a00adcac..a4eaafcc957f91 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -187,9 +187,14 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { std::shared_ptr publish_status; int64_t txn_expiration; int64_t previous_pubished_version {-1}; + int64_t previous_base_compaction_cnt {-1}; + int64_t previous_cumulative_compaction_cnt {-1}; + int64_t previous_cumulative_point {-1}; Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info( _transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration, - &partial_update_info, &publish_status, &previous_pubished_version); + &partial_update_info, &publish_status, &previous_pubished_version, + &previous_base_compaction_cnt, &previous_cumulative_compaction_cnt, + &previous_cumulative_point); if (status != Status::OK()) { LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << status; @@ -208,8 +213,11 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { txn_info.publish_version = _version; auto update_delete_bitmap_time_us = 0; if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) && - _version == previous_pubished_version) { - // if _version > previous_pubished_version, it means that this is a retry and there are + _version == previous_pubished_version && + _ms_base_compaction_cnt == previous_base_compaction_cnt && + _ms_cumulative_compaction_cnt == previous_cumulative_compaction_cnt && + _ms_cumulative_point == previous_cumulative_point) { + // if version or compaction stats can't match, it means that this is a retry and there are // compaction or other loads finished successfully on the same tablet. So the previous publish // is stale and we should re-calculate the delete bitmap LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index ff08899e73bc89..7088831b4f2232 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -648,7 +648,10 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap( *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get())); - _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), new_delete_bitmap, + // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn retries for some reason, + // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap. If we store the new_delete_bitmap + // the delete bitmap correctness check will fail + _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, txn_info->publish_version); diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 2a43919519f3b8..79f98212075a32 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -55,7 +55,9 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, std::shared_ptr* partial_update_info, - std::shared_ptr* publish_status, int64_t* previous_published_version) { + std::shared_ptr* publish_status, int64_t* previous_published_version, + int64_t* base_compaction_cnt, int64_t* cumulative_compaction_cnt, + int64_t* cumulative_point) { { std::shared_lock rlock(_rwlock); TxnKey key(transaction_id, tablet_id); @@ -69,9 +71,10 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( *txn_expiration = iter->second.txn_expiration; *partial_update_info = iter->second.partial_update_info; *publish_status = iter->second.publish_status; - if (iter->second.publish_version != -1) { - *previous_published_version = iter->second.publish_version; - } + *previous_published_version = iter->second.publish_version; + *base_compaction_cnt = iter->second.base_compaction_cnt; + *cumulative_compaction_cnt = iter->second.cumulative_compaction_cnt; + *cumulative_point = iter->second.cumulative_point; } RETURN_IF_ERROR( get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr)); @@ -153,20 +156,22 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info( .tag("delete_bitmap_size", charge); } -void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id, - int64_t tablet_id, - DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids, - PublishStatus publish_status, - int64_t publish_version) { +void CloudTxnDeleteBitmapCache::update_tablet_txn_info( + TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, + const RowsetIdUnorderedSet& rowset_ids, PublishStatus publish_status, + int64_t publish_version, int64_t base_compaction_cnt, int64_t cumulative_compaction_cnt, + int64_t cumulative_point) { { std::unique_lock wlock(_rwlock); TxnKey txn_key(transaction_id, tablet_id); CHECK(_txn_map.contains(txn_key)); TxnVal& txn_val = _txn_map[txn_key]; *(txn_val.publish_status) = publish_status; - if (publish_status == PublishStatus::SUCCEED && publish_version != -1) { + if (publish_status == PublishStatus::SUCCEED) { txn_val.publish_version = publish_version; + txn_val.base_compaction_cnt = base_compaction_cnt; + txn_val.cumulative_compaction_cnt = cumulative_compaction_cnt, + txn_val.cumulative_point = cumulative_point; } } std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id); diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index 0859c94ab81034..57858003f4d37a 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -43,7 +43,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, std::shared_ptr* partial_update_info, std::shared_ptr* publish_status, - int64_t* previous_published_version); + int64_t* previous_published_version, int64_t* base_compaction_cnt, + int64_t* cumulative_compaction_cnt, int64_t* cumulative_point); void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, @@ -53,7 +54,10 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, - PublishStatus publish_status, int64_t publish_version = -1); + PublishStatus publish_status, int64_t publish_version = -1, + int64_t base_compaction_cnt = -1, + int64_t cumulative_compaction_cnt = -1, + int64_t cumulative_point = -1); void remove_expired_tablet_txn_info(); @@ -91,6 +95,9 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { std::shared_ptr publish_status = nullptr; // used to determine if the retry needs to re-calculate the delete bitmap int64_t publish_version {-1}; + int64_t base_compaction_cnt {-1}; + int64_t cumulative_compaction_cnt {-1}; + int64_t cumulative_point {-1}; TxnVal() : txn_expiration(0) {}; TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_, std::shared_ptr partial_update_info_, diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out new file mode 100644 index 00000000000000..6fd2178fd94ac9 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 + +-- !sql -- +1 666 666 +2 555 555 +3 3 3 + +-- !sql -- +1 999 999 +2 888 888 +3 3 3 + From 38dbb94e6406e2570a6a549e09dbd446a875b385 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 8 Aug 2024 00:33:40 +0800 Subject: [PATCH 05/10] add load-compaction conflict case --- ...ow_stale_resp_load_compaction_conflict.out | 16 +++ ...stale_resp_load_compaction_conflict.groovy | 121 ++++++++++++++++++ ...d_mow_stale_resp_load_load_conflict.groovy | 3 +- 3 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out new file mode 100644 index 00000000000000..09882a909b391b --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 + +-- !sql -- +1 1 1 +2 2 2 +3 3 3 + +-- !sql -- +1 999 999 +2 888 888 +3 3 3 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy new file mode 100644 index 00000000000000..2a7b8a07e16e65 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_cloud_mow_stale_resp_load_compaction_conflict", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + def table1 = "test_cloud_mow_stale_resp_load_compaction_conflict" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1);" + sql "insert into ${table1} values(2,2,2);" + sql "insert into ${table1} values(3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the first load + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block", [wait_token: "token1"]) + + // the first load + t1 = Thread.start { + sql "insert into ${table1} values(1,999,999),(2,888,888);" + } + + // wait util the first load's delete bitmap update lock expired + // to ensure that the second load can take the delete bitmap update lock + // Config.delete_bitmap_lock_expiration_seconds = 10s + Thread.sleep(11 * 1000) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + order_qt_sql "select * from ${table1};" + + + // keep waiting util the delete bitmap calculation timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s) + // and the coordinator BE will retry to commit the first load's txn + Thread.sleep(15 * 1000) + + // let the first partial update load finish + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + t1.join() + + Thread.sleep(1000) + + order_qt_sql "select * from ${table1};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + // sql "DROP TABLE IF EXISTS ${table1};" +} diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy index 858c2fdcb05c5b..3a1ee882616771 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy @@ -65,7 +65,6 @@ suite("test_cloud_mow_stale_resp_load_load_conflict", "nonConcurrent") { order_qt_sql "select * from ${table1};" - // GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) // keep waiting util the delete bitmap calculation timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s) // and the coordinator BE will retry to commit the first load's txn @@ -86,5 +85,5 @@ suite("test_cloud_mow_stale_resp_load_load_conflict", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllBEs() } - // sql "DROP TABLE IF EXISTS ${table1};" + sql "DROP TABLE IF EXISTS ${table1};" } From b1d2069eefdd6380ed9694e18fea7b65e57c4112 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 8 Aug 2024 00:39:25 +0800 Subject: [PATCH 06/10] fix not update compaction stats --- be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp | 3 +++ be/src/cloud/cloud_tablet.cpp | 7 ++++--- be/src/cloud/cloud_txn_delete_bitmap_cache.cpp | 1 - be/src/olap/txn_manager.h | 5 ++++- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index a4eaafcc957f91..378f8bbcb54f81 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -211,6 +211,9 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { txn_info.partial_update_info = partial_update_info; txn_info.publish_status = publish_status; txn_info.publish_version = _version; + txn_info.base_compaction_cnt = _ms_base_compaction_cnt; + txn_info.cumulative_compaction_cnt = _ms_cumulative_compaction_cnt, + txn_info.cumulative_point = _ms_cumulative_point; auto update_delete_bitmap_time_us = 0; if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) && _version == previous_pubished_version && diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 7088831b4f2232..a6cc483c15953d 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -651,9 +651,10 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn retries for some reason, // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap. If we store the new_delete_bitmap // the delete bitmap correctness check will fail - _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap, - cur_rowset_ids, PublishStatus::SUCCEED, - txn_info->publish_version); + _engine.txn_delete_bitmap_cache().update_tablet_txn_info( + txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, + txn_info->publish_version, txn_info->base_compaction_cnt, + txn_info->cumulative_compaction_cnt, txn_info->cumulative_point); return Status::OK(); } diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 79f98212075a32..b7402cd1970a17 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -27,7 +27,6 @@ #include "cpp/sync_point.h" #include "olap/olap_common.h" #include "olap/tablet_meta.h" -#include "olap/txn_manager.h" namespace doris { diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 9b733566b40a10..5f4d2ebee615b9 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -75,7 +75,10 @@ struct TabletTxnInfo { bool ingest {false}; std::shared_ptr partial_update_info; std::shared_ptr publish_status; - int64_t publish_version; + int64_t publish_version {-1}; + int64_t base_compaction_cnt {-1}; + int64_t cumulative_compaction_cnt {-1}; + int64_t cumulative_point {-1}; TxnState state {TxnState::PREPARED}; TabletTxnInfo() = default; From 3be0b37af2be6d8888d343ae6e2e6335404975aa Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 8 Aug 2024 14:45:05 +0800 Subject: [PATCH 07/10] remove sentinel marks from delete bitmap when using delete bitmap from cache --- be/src/cloud/cloud_meta_mgr.cpp | 11 ++++++++--- be/src/cloud/cloud_tablet.cpp | 7 ++++--- be/src/cloud/cloud_txn_delete_bitmap_cache.cpp | 4 +++- be/src/cloud/cloud_txn_delete_bitmap_cache.h | 2 ++ be/src/olap/base_tablet.cpp | 13 +------------ be/src/olap/base_tablet.h | 1 - be/src/olap/tablet_meta.cpp | 10 ++++++++++ be/src/olap/tablet_meta.h | 2 ++ 8 files changed, 30 insertions(+), 20 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index e743ea9b12c8ce..7a01590c662523 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -559,13 +559,18 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64 } txn_processed.insert(txn_id); DeleteBitmapPtr tmp_delete_bitmap; - RowsetIdUnorderedSet tmp_rowset_ids; std::shared_ptr publish_status = std::make_shared(PublishStatus::INIT); CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap( - txn_id, tablet->tablet_id(), &tmp_delete_bitmap, &tmp_rowset_ids, &publish_status); - if (status.ok() && *(publish_status.get()) == PublishStatus::SUCCEED) { + txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status); + // CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services. + // If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can + // use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other + // stats(version or compaction stats) + if (status.ok() && *publish_status == PublishStatus::SUCCEED) { + // tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap + tmp_delete_bitmap->remove_sentinel_marks(); delete_bitmap->merge(*tmp_delete_bitmap); engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, tablet->tablet_id()); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index a6cc483c15953d..c4ea50ca87b0c4 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -648,9 +648,10 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap( *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get())); - // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn retries for some reason, - // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap. If we store the new_delete_bitmap - // the delete bitmap correctness check will fail + + // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, + // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do + // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail _engine.txn_delete_bitmap_cache().update_tablet_txn_info( txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, txn_info->publish_version, txn_info->base_compaction_cnt, diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index b7402cd1970a17..758a740ab3c2a4 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -102,7 +102,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap( handle == nullptr ? nullptr : reinterpret_cast(value(handle)); if (val) { *delete_bitmap = val->delete_bitmap; - *rowset_ids = val->rowset_ids; + if (rowset_ids) { + *rowset_ids = val->rowset_ids; + } // must call release handle to reduce the reference count, // otherwise there will be memory leak release(handle); diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index 57858003f4d37a..94e759ae345b00 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -63,6 +63,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { void remove_unused_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id); + // !!!ATTENTION!!!: the delete bitmap stored in CloudTxnDeleteBitmapCache contains sentinel marks, + // when using delete bitmap from this cache, the caller should remove them manually if don't need it Status get_delete_bitmap(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, std::shared_ptr* publish_status); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 7655b349957301..0fb12dd074f8b0 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1208,17 +1208,6 @@ Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap return Status::OK(); } -void BaseTablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete_bitmap) { - for (auto it = delete_bitmap->delete_bitmap.begin(), end = delete_bitmap->delete_bitmap.end(); - it != end;) { - if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) { - it = delete_bitmap->delete_bitmap.erase(it); - } else { - ++it; - } - } -} - Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info, int64_t txn_id, int64_t txn_expiration) { SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency); @@ -1559,7 +1548,7 @@ Status BaseTablet::update_delete_bitmap_without_lock( if (!st.ok()) { LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); } - self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap); + delete_bitmap->remove_sentinel_marks(); } for (auto& iter : delete_bitmap->delete_bitmap) { self->_tablet_meta->delete_bitmap().merge( diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index f958d398fd5d00..d329c786fc9781 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -289,7 +289,6 @@ class BaseTablet { static void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const RowsetIdUnorderedSet& pre, RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del); - static void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete_bitmap); Status _capture_consistent_rowsets_unlocked(const std::vector& version_path, std::vector* rowsets) const; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index a3526781dddd87..ed9a446551d00e 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1080,6 +1080,16 @@ bool DeleteBitmap::contains_agg_without_cache(const BitmapKey& bmk, uint32_t row return false; } +void DeleteBitmap::remove_sentinel_marks() { + for (auto it = delete_bitmap.begin(), end = delete_bitmap.end(); it != end;) { + if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) { + it = delete_bitmap.erase(it); + } else { + ++it; + } + } +} + int DeleteBitmap::set(const BitmapKey& bmk, const roaring::Roaring& segment_delete_bitmap) { std::lock_guard l(lock); auto [_, inserted] = delete_bitmap.insert_or_assign(bmk, segment_delete_bitmap); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 32c6fde568c87b..bb6b5b8cd51725 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -516,6 +516,8 @@ class DeleteBitmap { */ std::shared_ptr get_agg(const BitmapKey& bmk) const; + void remove_sentinel_marks(); + class AggCachePolicy : public LRUCachePolicyTrackingManual { public: AggCachePolicy(size_t capacity) From 671a226f0c59bc849172c76da41a8799e02b4764 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 8 Aug 2024 15:35:48 +0800 Subject: [PATCH 08/10] store publish stats in TxnPublishInfo --- .../cloud_engine_calc_delete_bitmap_task.cpp | 25 ++++++------- be/src/cloud/cloud_tablet.cpp | 7 ++-- .../cloud/cloud_txn_delete_bitmap_cache.cpp | 26 ++++++-------- be/src/cloud/cloud_txn_delete_bitmap_cache.h | 13 ++----- be/src/olap/txn_manager.h | 36 ++++++++++++------- 5 files changed, 50 insertions(+), 57 deletions(-) diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 378f8bbcb54f81..b6c9aa318f387c 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -186,15 +186,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { std::shared_ptr partial_update_info; std::shared_ptr publish_status; int64_t txn_expiration; - int64_t previous_pubished_version {-1}; - int64_t previous_base_compaction_cnt {-1}; - int64_t previous_cumulative_compaction_cnt {-1}; - int64_t previous_cumulative_point {-1}; + TxnPublishInfo previous_publish_info; Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info( _transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration, - &partial_update_info, &publish_status, &previous_pubished_version, - &previous_base_compaction_cnt, &previous_cumulative_compaction_cnt, - &previous_cumulative_point); + &partial_update_info, &publish_status, &previous_publish_info); if (status != Status::OK()) { LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << status; @@ -210,16 +205,16 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { txn_info.rowset_ids = rowset_ids; txn_info.partial_update_info = partial_update_info; txn_info.publish_status = publish_status; - txn_info.publish_version = _version; - txn_info.base_compaction_cnt = _ms_base_compaction_cnt; - txn_info.cumulative_compaction_cnt = _ms_cumulative_compaction_cnt, - txn_info.cumulative_point = _ms_cumulative_point; + txn_info.publish_info = {.publish_version = _version, + .base_compaction_cnt = _ms_base_compaction_cnt, + .cumulative_compaction_cnt = _ms_cumulative_compaction_cnt, + .cumulative_point = _ms_cumulative_point}; auto update_delete_bitmap_time_us = 0; if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) && - _version == previous_pubished_version && - _ms_base_compaction_cnt == previous_base_compaction_cnt && - _ms_cumulative_compaction_cnt == previous_cumulative_compaction_cnt && - _ms_cumulative_point == previous_cumulative_point) { + _version == previous_publish_info.publish_version && + _ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt && + _ms_cumulative_compaction_cnt == previous_publish_info.cumulative_compaction_cnt && + _ms_cumulative_point == previous_publish_info.cumulative_point) { // if version or compaction stats can't match, it means that this is a retry and there are // compaction or other loads finished successfully on the same tablet. So the previous publish // is stale and we should re-calculate the delete bitmap diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index c4ea50ca87b0c4..e7a5871e81b150 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -652,10 +652,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail - _engine.txn_delete_bitmap_cache().update_tablet_txn_info( - txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, - txn_info->publish_version, txn_info->base_compaction_cnt, - txn_info->cumulative_compaction_cnt, txn_info->cumulative_point); + _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap, + cur_rowset_ids, PublishStatus::SUCCEED, + txn_info->publish_info); return Status::OK(); } diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 758a740ab3c2a4..c6a3b54edc3f67 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -27,6 +27,7 @@ #include "cpp/sync_point.h" #include "olap/olap_common.h" #include "olap/tablet_meta.h" +#include "olap/txn_manager.h" namespace doris { @@ -54,9 +55,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, std::shared_ptr* partial_update_info, - std::shared_ptr* publish_status, int64_t* previous_published_version, - int64_t* base_compaction_cnt, int64_t* cumulative_compaction_cnt, - int64_t* cumulative_point) { + std::shared_ptr* publish_status, TxnPublishInfo* previous_publish_info) { { std::shared_lock rlock(_rwlock); TxnKey key(transaction_id, tablet_id); @@ -70,10 +69,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( *txn_expiration = iter->second.txn_expiration; *partial_update_info = iter->second.partial_update_info; *publish_status = iter->second.publish_status; - *previous_published_version = iter->second.publish_version; - *base_compaction_cnt = iter->second.base_compaction_cnt; - *cumulative_compaction_cnt = iter->second.cumulative_compaction_cnt; - *cumulative_point = iter->second.cumulative_point; + *previous_publish_info = iter->second.publish_info; } RETURN_IF_ERROR( get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr)); @@ -157,11 +153,12 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info( .tag("delete_bitmap_size", charge); } -void CloudTxnDeleteBitmapCache::update_tablet_txn_info( - TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids, PublishStatus publish_status, - int64_t publish_version, int64_t base_compaction_cnt, int64_t cumulative_compaction_cnt, - int64_t cumulative_point) { +void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id, + int64_t tablet_id, + DeleteBitmapPtr delete_bitmap, + const RowsetIdUnorderedSet& rowset_ids, + PublishStatus publish_status, + TxnPublishInfo publish_info) { { std::unique_lock wlock(_rwlock); TxnKey txn_key(transaction_id, tablet_id); @@ -169,10 +166,7 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info( TxnVal& txn_val = _txn_map[txn_key]; *(txn_val.publish_status) = publish_status; if (publish_status == PublishStatus::SUCCEED) { - txn_val.publish_version = publish_version; - txn_val.base_compaction_cnt = base_compaction_cnt; - txn_val.cumulative_compaction_cnt = cumulative_compaction_cnt, - txn_val.cumulative_point = cumulative_point; + txn_val.publish_info = publish_info; } } std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id); diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index 94e759ae345b00..b462ce9e1bde86 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -43,8 +43,7 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, std::shared_ptr* partial_update_info, std::shared_ptr* publish_status, - int64_t* previous_published_version, int64_t* base_compaction_cnt, - int64_t* cumulative_compaction_cnt, int64_t* cumulative_point); + TxnPublishInfo* previous_publish_info); void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, @@ -54,10 +53,7 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, - PublishStatus publish_status, int64_t publish_version = -1, - int64_t base_compaction_cnt = -1, - int64_t cumulative_compaction_cnt = -1, - int64_t cumulative_point = -1); + PublishStatus publish_status, TxnPublishInfo publish_info = {}); void remove_expired_tablet_txn_info(); @@ -96,10 +92,7 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { std::shared_ptr partial_update_info; std::shared_ptr publish_status = nullptr; // used to determine if the retry needs to re-calculate the delete bitmap - int64_t publish_version {-1}; - int64_t base_compaction_cnt {-1}; - int64_t cumulative_compaction_cnt {-1}; - int64_t cumulative_point {-1}; + TxnPublishInfo publish_info; TxnVal() : txn_expiration(0) {}; TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_, std::shared_ptr partial_update_info_, diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 5f4d2ebee615b9..5944bbf0fc3136 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -63,6 +63,13 @@ enum class TxnState { }; enum class PublishStatus { INIT = 0, PREPARE = 1, SUCCEED = 2 }; +struct TxnPublishInfo { + int64_t publish_version {-1}; + int64_t base_compaction_cnt {-1}; + int64_t cumulative_compaction_cnt {-1}; + int64_t cumulative_point {-1}; +}; + struct TabletTxnInfo { PUniqueId load_id; RowsetSharedPtr rowset; @@ -74,28 +81,33 @@ struct TabletTxnInfo { int64_t creation_time; bool ingest {false}; std::shared_ptr partial_update_info; + + // for cloud only, used to determine if a retry CloudTabletCalcDeleteBitmapTask + // needs to re-calculate the delete bitmap std::shared_ptr publish_status; - int64_t publish_version {-1}; - int64_t base_compaction_cnt {-1}; - int64_t cumulative_compaction_cnt {-1}; - int64_t cumulative_point {-1}; - TxnState state {TxnState::PREPARED}; + TxnPublishInfo publish_info; + TxnState state {TxnState::PREPARED}; TabletTxnInfo() = default; TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) - : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} + : load_id(std::move(load_id)), + rowset(std::move(rowset)), + creation_time(UnixSeconds()) {} TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg) - : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {} + : load_id(std::move(load_id)), + rowset(std::move(rowset)), + creation_time(UnixSeconds()), + ingest(ingest_arg) {} TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write, - DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids) - : load_id(load_id), - rowset(rowset), + DeleteBitmapPtr delete_bitmap, RowsetIdUnorderedSet ids) + : load_id(std::move(load_id)), + rowset(std::move(rowset)), unique_key_merge_on_write(merge_on_write), - delete_bitmap(delete_bitmap), - rowset_ids(ids), + delete_bitmap(std::move(delete_bitmap)), + rowset_ids(std::move(ids)), creation_time(UnixSeconds()) {} void prepare() { state = TxnState::PREPARED; } From 29b17d96155ae3f8de4770a26737b5c56f10720f Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 8 Aug 2024 16:11:41 +0800 Subject: [PATCH 09/10] use customized delete bitmap expiration and timeout fe config in regression case --- ...stale_resp_load_compaction_conflict.groovy | 176 +++++++++--------- ...d_mow_stale_resp_load_load_conflict.groovy | 120 ++++++------ 2 files changed, 156 insertions(+), 140 deletions(-) diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy index 2a7b8a07e16e65..8f4fa45700b81f 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy @@ -24,98 +24,106 @@ suite("test_cloud_mow_stale_resp_load_compaction_conflict", "nonConcurrent") { return } - def table1 = "test_cloud_mow_stale_resp_load_compaction_conflict" - sql "DROP TABLE IF EXISTS ${table1} FORCE;" - sql """ CREATE TABLE IF NOT EXISTS ${table1} ( - `k1` int NOT NULL, - `c1` int, - `c2` int - )UNIQUE KEY(k1) - DISTRIBUTED BY HASH(k1) BUCKETS 1 - PROPERTIES ( - "enable_unique_key_merge_on_write" = "true", - "disable_auto_compaction" = "true", - "replication_num" = "1"); """ - - sql "insert into ${table1} values(1,1,1);" - sql "insert into ${table1} values(2,2,2);" - sql "insert into ${table1} values(3,3,3);" - sql "sync;" - order_qt_sql "select * from ${table1};" - - - def beNodes = sql_return_maparray("show backends;") - def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) - def tabletBackendId = tabletStat.BackendId - def tabletId = tabletStat.TabletId - def tabletBackend; - for (def be : beNodes) { - if (be.BackendId == tabletBackendId) { - tabletBackend = be - break; - } - } - logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); - - - try { - GetDebugPoint().clearDebugPointsForAllFEs() - GetDebugPoint().clearDebugPointsForAllBEs() + def customFeConfig = [ + delete_bitmap_lock_expiration_seconds : 10, + calculate_delete_bitmap_task_timeout_seconds : 15, + ] + + setFeConfigTemporary(customFeConfig) { + + def table1 = "test_cloud_mow_stale_resp_load_compaction_conflict" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1);" + sql "insert into ${table1} values(2,2,2);" + sql "insert into ${table1} values(3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" - // block the first load - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block", [wait_token: "token1"]) - // the first load - t1 = Thread.start { - sql "insert into ${table1} values(1,999,999),(2,888,888);" + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); - // wait util the first load's delete bitmap update lock expired - // to ensure that the second load can take the delete bitmap update lock - // Config.delete_bitmap_lock_expiration_seconds = 10s - Thread.sleep(11 * 1000) - - // trigger full compaction on tablet - logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") - def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) - logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) - Assert.assertEquals(code, 0) - def compactJson = parseJson(out.trim()) - Assert.assertEquals("success", compactJson.status.toLowerCase()) - - // wait for full compaction to complete - Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( - { - (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) - logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) - Assert.assertEquals(code, 0) - def compactionStatus = parseJson(out.trim()) - Assert.assertEquals("success", compactionStatus.status.toLowerCase()) - return !compactionStatus.run_status - } - ) - order_qt_sql "select * from ${table1};" + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() - // keep waiting util the delete bitmap calculation timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s) - // and the coordinator BE will retry to commit the first load's txn - Thread.sleep(15 * 1000) + // block the first load + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block", [wait_token: "token1"]) - // let the first partial update load finish - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") - t1.join() + // the first load + t1 = Thread.start { + sql "insert into ${table1} values(1,999,999),(2,888,888);" + } - Thread.sleep(1000) + // wait util the first load's delete bitmap update lock expired + // to ensure that the second load can take the delete bitmap update lock + // Config.delete_bitmap_lock_expiration_seconds = 10s + Thread.sleep(11 * 1000) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + order_qt_sql "select * from ${table1};" + + + // keep waiting util the delete bitmap calculation timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s) + // and the coordinator BE will retry to commit the first load's txn + Thread.sleep(15 * 1000) + + // let the first partial update load finish + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + t1.join() + + Thread.sleep(1000) + + order_qt_sql "select * from ${table1};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } - order_qt_sql "select * from ${table1};" - - } catch(Exception e) { - logger.info(e.getMessage()) - throw e - } finally { - GetDebugPoint().clearDebugPointsForAllBEs() + sql "DROP TABLE IF EXISTS ${table1};" } - - // sql "DROP TABLE IF EXISTS ${table1};" } diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy index 3a1ee882616771..377ff70cf2101d 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy @@ -20,70 +20,78 @@ suite("test_cloud_mow_stale_resp_load_load_conflict", "nonConcurrent") { return } - def table1 = "test_cloud_mow_stale_resp_load_load_conflict" - sql "DROP TABLE IF EXISTS ${table1} FORCE;" - sql """ CREATE TABLE IF NOT EXISTS ${table1} ( - `k1` int NOT NULL, - `c1` int, - `c2` int - )UNIQUE KEY(k1) - DISTRIBUTED BY HASH(k1) BUCKETS 1 - PROPERTIES ( - "enable_unique_key_merge_on_write" = "true", - "disable_auto_compaction" = "true", - "replication_num" = "1"); """ - - sql "insert into ${table1} values(1,1,1);" - sql "insert into ${table1} values(2,2,2);" - sql "insert into ${table1} values(3,3,3);" - sql "sync;" - order_qt_sql "select * from ${table1};" - - try { - GetDebugPoint().clearDebugPointsForAllFEs() - GetDebugPoint().clearDebugPointsForAllBEs() - - // block the first load - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block", [wait_token: "token1"]) - - // the first load - t1 = Thread.start { - sql "insert into ${table1} values(1,999,999),(2,888,888);" - } + def customFeConfig = [ + delete_bitmap_lock_expiration_seconds : 10, + calculate_delete_bitmap_task_timeout_seconds : 15, + ] + + setFeConfigTemporary(customFeConfig) { + + def table1 = "test_cloud_mow_stale_resp_load_load_conflict" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1);" + sql "insert into ${table1} values(2,2,2);" + sql "insert into ${table1} values(3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" - // wait util the first load's delete bitmap update lock expired - // to ensure that the second load can take the delete bitmap update lock - // Config.delete_bitmap_lock_expiration_seconds = 10s - Thread.sleep(11 * 1000) + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() - // the second load - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token2"]) - Thread.sleep(200) + // block the first load + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block", [wait_token: "token1"]) - sql "insert into ${table1}(k1,c1,c2) values(1,666,666),(2,555,555);" + // the first load + t1 = Thread.start { + sql "insert into ${table1} values(1,999,999),(2,888,888);" + } - order_qt_sql "select * from ${table1};" + // wait util the first load's delete bitmap update lock expired + // to ensure that the second load can take the delete bitmap update lock + // Config.delete_bitmap_lock_expiration_seconds = 10s + Thread.sleep(11 * 1000) + // the second load + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token2"]) + Thread.sleep(200) - // keep waiting util the delete bitmap calculation timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s) - // and the coordinator BE will retry to commit the first load's txn - Thread.sleep(15 * 1000) + sql "insert into ${table1}(k1,c1,c2) values(1,666,666),(2,555,555);" - // let the first partial update load finish - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") - t1.join() + order_qt_sql "select * from ${table1};" - Thread.sleep(1000) - order_qt_sql "select * from ${table1};" - - } catch(Exception e) { - logger.info(e.getMessage()) - throw e - } finally { - GetDebugPoint().clearDebugPointsForAllBEs() - } + // keep waiting util the delete bitmap calculation timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s) + // and the coordinator BE will retry to commit the first load's txn + Thread.sleep(15 * 1000) - sql "DROP TABLE IF EXISTS ${table1};" + // let the first partial update load finish + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + t1.join() + + Thread.sleep(1000) + + order_qt_sql "select * from ${table1};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${table1};" + } } From 5a849fa620d4da31a50972df1c8dd9c7a9f3cec5 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 9 Aug 2024 18:06:17 +0800 Subject: [PATCH 10/10] replace the version of BitmapKey in cache with real rowset version --- be/src/cloud/cloud_meta_mgr.cpp | 16 +++++++++++++--- be/src/cloud/cloud_txn_delete_bitmap_cache.h | 4 +++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 7a01590c662523..88725b177863f6 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -569,9 +569,19 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64 // use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other // stats(version or compaction stats) if (status.ok() && *publish_status == PublishStatus::SUCCEED) { - // tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap - tmp_delete_bitmap->remove_sentinel_marks(); - delete_bitmap->merge(*tmp_delete_bitmap); + // tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap. + // Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON, + // we should replace it with the rowset's real version + DCHECK(rs_meta.start_version() == rs_meta.end_version()); + int64_t rowset_version = rs_meta.start_version(); + for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) { + // skip sentinel mark, which is used for delete bitmap correctness check + if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) { + delete_bitmap->merge({std::get<0>(delete_bitmap_key), + std::get<1>(delete_bitmap_key), rowset_version}, + bitmap_value); + } + } engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, tablet->tablet_id()); } else { diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index b462ce9e1bde86..75577ae2e3fee0 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -60,7 +60,9 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { void remove_unused_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id); // !!!ATTENTION!!!: the delete bitmap stored in CloudTxnDeleteBitmapCache contains sentinel marks, - // when using delete bitmap from this cache, the caller should remove them manually if don't need it + // and the version in BitmapKey is DeleteBitmap::TEMP_VERSION_COMMON. + // when using delete bitmap from this cache, the caller should manually remove these marks if don't need it + // and should replace versions in BitmapKey by the correct version Status get_delete_bitmap(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, std::shared_ptr* publish_status);