diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 7bbd602f571ede..8a6378794e92d5 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -2052,6 +2052,7 @@ void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskReq finish_task_request.__set_signature(req.signature); finish_task_request.__set_report_version(s_report_version); finish_task_request.__set_error_tablet_ids(error_tablet_ids); + finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 5d1a957d14df19..b6c9aa318f387c 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -186,9 +186,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { std::shared_ptr partial_update_info; std::shared_ptr publish_status; int64_t txn_expiration; + TxnPublishInfo previous_publish_info; Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info( _transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration, - &partial_update_info, &publish_status); + &partial_update_info, &publish_status, &previous_publish_info); if (status != Status::OK()) { LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << status; @@ -204,8 +205,19 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { txn_info.rowset_ids = rowset_ids; txn_info.partial_update_info = partial_update_info; txn_info.publish_status = publish_status; + txn_info.publish_info = {.publish_version = _version, + .base_compaction_cnt = _ms_base_compaction_cnt, + .cumulative_compaction_cnt = _ms_cumulative_compaction_cnt, + .cumulative_point = _ms_cumulative_point}; auto update_delete_bitmap_time_us = 0; - if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED)) { + if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) && + _version == previous_publish_info.publish_version && + _ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt && + _ms_cumulative_compaction_cnt == previous_publish_info.cumulative_compaction_cnt && + _ms_cumulative_point == previous_publish_info.cumulative_point) { + // if version or compaction stats can't match, it means that this is a retry and there are + // compaction or other loads finished successfully on the same tablet. So the previous publish + // is stale and we should re-calculate the delete bitmap LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id << ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap."; } else { diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index e743ea9b12c8ce..88725b177863f6 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -559,14 +559,29 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64 } txn_processed.insert(txn_id); DeleteBitmapPtr tmp_delete_bitmap; - RowsetIdUnorderedSet tmp_rowset_ids; std::shared_ptr publish_status = std::make_shared(PublishStatus::INIT); CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap( - txn_id, tablet->tablet_id(), &tmp_delete_bitmap, &tmp_rowset_ids, &publish_status); - if (status.ok() && *(publish_status.get()) == PublishStatus::SUCCEED) { - delete_bitmap->merge(*tmp_delete_bitmap); + txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status); + // CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services. + // If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can + // use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other + // stats(version or compaction stats) + if (status.ok() && *publish_status == PublishStatus::SUCCEED) { + // tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap. + // Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON, + // we should replace it with the rowset's real version + DCHECK(rs_meta.start_version() == rs_meta.end_version()); + int64_t rowset_version = rs_meta.start_version(); + for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) { + // skip sentinel mark, which is used for delete bitmap correctness check + if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) { + delete_bitmap->merge({std::get<0>(delete_bitmap_key), + std::get<1>(delete_bitmap_key), rowset_version}, + bitmap_value); + } + } engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, tablet->tablet_id()); } else { diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 17ec1fe22b0d85..e7a5871e81b150 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -648,8 +648,13 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap( *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get())); - _engine.txn_delete_bitmap_cache().update_tablet_txn_info( - txn_id, tablet_id(), new_delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED); + + // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, + // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do + // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail + _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap, + cur_rowset_ids, PublishStatus::SUCCEED, + txn_info->publish_info); return Status::OK(); } diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 583992e76f7aba..c6a3b54edc3f67 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -27,6 +27,7 @@ #include "cpp/sync_point.h" #include "olap/olap_common.h" #include "olap/tablet_meta.h" +#include "olap/txn_manager.h" namespace doris { @@ -54,7 +55,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, std::shared_ptr* partial_update_info, - std::shared_ptr* publish_status) { + std::shared_ptr* publish_status, TxnPublishInfo* previous_publish_info) { { std::shared_lock rlock(_rwlock); TxnKey key(transaction_id, tablet_id); @@ -68,6 +69,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( *txn_expiration = iter->second.txn_expiration; *partial_update_info = iter->second.partial_update_info; *publish_status = iter->second.publish_status; + *previous_publish_info = iter->second.publish_info; } RETURN_IF_ERROR( get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr)); @@ -96,7 +98,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap( handle == nullptr ? nullptr : reinterpret_cast(value(handle)); if (val) { *delete_bitmap = val->delete_bitmap; - *rowset_ids = val->rowset_ids; + if (rowset_ids) { + *rowset_ids = val->rowset_ids; + } // must call release handle to reduce the reference count, // otherwise there will be memory leak release(handle); @@ -153,12 +157,17 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, - PublishStatus publish_status) { + PublishStatus publish_status, + TxnPublishInfo publish_info) { { std::unique_lock wlock(_rwlock); TxnKey txn_key(transaction_id, tablet_id); - CHECK(_txn_map.count(txn_key) > 0); - *(_txn_map[txn_key].publish_status.get()) = publish_status; + CHECK(_txn_map.contains(txn_key)); + TxnVal& txn_val = _txn_map[txn_key]; + *(txn_val.publish_status) = publish_status; + if (publish_status == PublishStatus::SUCCEED) { + txn_val.publish_info = publish_info; + } } std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id); CacheKey key(key_str); diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index 5012db6b8e5bf3..75577ae2e3fee0 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -42,7 +42,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration, std::shared_ptr* partial_update_info, - std::shared_ptr* publish_status); + std::shared_ptr* publish_status, + TxnPublishInfo* previous_publish_info); void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, @@ -52,12 +53,16 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, - PublishStatus publish_status); + PublishStatus publish_status, TxnPublishInfo publish_info = {}); void remove_expired_tablet_txn_info(); void remove_unused_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id); + // !!!ATTENTION!!!: the delete bitmap stored in CloudTxnDeleteBitmapCache contains sentinel marks, + // and the version in BitmapKey is DeleteBitmap::TEMP_VERSION_COMMON. + // when using delete bitmap from this cache, the caller should manually remove these marks if don't need it + // and should replace versions in BitmapKey by the correct version Status get_delete_bitmap(TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, std::shared_ptr* publish_status); @@ -88,6 +93,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { int64_t txn_expiration; std::shared_ptr partial_update_info; std::shared_ptr publish_status = nullptr; + // used to determine if the retry needs to re-calculate the delete bitmap + TxnPublishInfo publish_info; TxnVal() : txn_expiration(0) {}; TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_, std::shared_ptr partial_update_info_, diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 4ca36684383939..0fb12dd074f8b0 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1208,17 +1208,6 @@ Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap return Status::OK(); } -void BaseTablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete_bitmap) { - for (auto it = delete_bitmap->delete_bitmap.begin(), end = delete_bitmap->delete_bitmap.end(); - it != end;) { - if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) { - it = delete_bitmap->delete_bitmap.erase(it); - } else { - ++it; - } - } -} - Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info, int64_t txn_id, int64_t txn_expiration) { SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency); @@ -1296,6 +1285,21 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf } } + DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.enable_spin_wait", { + auto token = dp->param("token", "invalid_token"); + while (DebugPoints::instance()->is_enable("BaseTablet::update_delete_bitmap.block")) { + auto block_dp = DebugPoints::instance()->get_debug_point( + "BaseTablet::update_delete_bitmap.block"); + if (block_dp) { + auto wait_token = block_dp->param("wait_token", ""); + if (wait_token != token) { + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }); + if (!rowsets_skip_alignment.empty()) { auto token = self->calc_delete_bitmap_executor()->create_token(); // set rowset_writer to nullptr to skip the alignment process @@ -1544,7 +1548,7 @@ Status BaseTablet::update_delete_bitmap_without_lock( if (!st.ok()) { LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); } - self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap); + delete_bitmap->remove_sentinel_marks(); } for (auto& iter : delete_bitmap->delete_bitmap) { self->_tablet_meta->delete_bitmap().merge( diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index f958d398fd5d00..d329c786fc9781 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -289,7 +289,6 @@ class BaseTablet { static void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const RowsetIdUnorderedSet& pre, RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del); - static void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete_bitmap); Status _capture_consistent_rowsets_unlocked(const std::vector& version_path, std::vector* rowsets) const; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index a3526781dddd87..ed9a446551d00e 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1080,6 +1080,16 @@ bool DeleteBitmap::contains_agg_without_cache(const BitmapKey& bmk, uint32_t row return false; } +void DeleteBitmap::remove_sentinel_marks() { + for (auto it = delete_bitmap.begin(), end = delete_bitmap.end(); it != end;) { + if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) { + it = delete_bitmap.erase(it); + } else { + ++it; + } + } +} + int DeleteBitmap::set(const BitmapKey& bmk, const roaring::Roaring& segment_delete_bitmap) { std::lock_guard l(lock); auto [_, inserted] = delete_bitmap.insert_or_assign(bmk, segment_delete_bitmap); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 32c6fde568c87b..bb6b5b8cd51725 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -516,6 +516,8 @@ class DeleteBitmap { */ std::shared_ptr get_agg(const BitmapKey& bmk) const; + void remove_sentinel_marks(); + class AggCachePolicy : public LRUCachePolicyTrackingManual { public: AggCachePolicy(size_t capacity) diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 5a0a74c76a2825..5944bbf0fc3136 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -63,6 +63,13 @@ enum class TxnState { }; enum class PublishStatus { INIT = 0, PREPARE = 1, SUCCEED = 2 }; +struct TxnPublishInfo { + int64_t publish_version {-1}; + int64_t base_compaction_cnt {-1}; + int64_t cumulative_compaction_cnt {-1}; + int64_t cumulative_point {-1}; +}; + struct TabletTxnInfo { PUniqueId load_id; RowsetSharedPtr rowset; @@ -74,24 +81,33 @@ struct TabletTxnInfo { int64_t creation_time; bool ingest {false}; std::shared_ptr partial_update_info; + + // for cloud only, used to determine if a retry CloudTabletCalcDeleteBitmapTask + // needs to re-calculate the delete bitmap std::shared_ptr publish_status; - TxnState state {TxnState::PREPARED}; + TxnPublishInfo publish_info; + TxnState state {TxnState::PREPARED}; TabletTxnInfo() = default; TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) - : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} + : load_id(std::move(load_id)), + rowset(std::move(rowset)), + creation_time(UnixSeconds()) {} TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg) - : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {} + : load_id(std::move(load_id)), + rowset(std::move(rowset)), + creation_time(UnixSeconds()), + ingest(ingest_arg) {} TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write, - DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids) - : load_id(load_id), - rowset(rowset), + DeleteBitmapPtr delete_bitmap, RowsetIdUnorderedSet ids) + : load_id(std::move(load_id)), + rowset(std::move(rowset)), unique_key_merge_on_write(merge_on_write), - delete_bitmap(delete_bitmap), - rowset_ids(ids), + delete_bitmap(std::move(delete_bitmap)), + rowset_ids(std::move(ids)), creation_time(UnixSeconds()) {} void prepare() { state = TxnState::PREPARED; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 0eef0c684d6062..4e01f3a5058774 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -671,6 +671,17 @@ private void finishCalcDeleteBitmap(AgentTask task, TFinishTaskRequest request) "backend: " + task.getBackendId() + ", error_tablet_size: " + request.getErrorTabletIdsSize() + ", err_msg: " + request.getTaskStatus().getErrorMsgs().toString()); + } else if (request.isSetRespPartitions() + && calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) { + LOG.warn("get staled response from backend: {}, report version: {}. calcDeleteBitmapTask's" + + "partitionInfos: {}. response's partitionInfos: {}", task.getBackendId(), + request.getReportVersion(), + calcDeleteBitmapTask.getCalcDeleteBimapPartitionInfos().toString(), + request.getRespPartitions().toString()); + // DELETE_BITMAP_LOCK_ERROR will be retried + calcDeleteBitmapTask.countDownToZero(TStatusCode.DELETE_BITMAP_LOCK_ERROR, + "get staled response from backend " + task.getBackendId() + ", report version: " + + request.getReportVersion()); } else { calcDeleteBitmapTask.countDownLatch(task.getBackendId(), calcDeleteBitmapTask.getTransactionId()); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java index 4188cf61849a91..49a653c7a32c26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java @@ -79,6 +79,10 @@ public void countDownToZero(TStatusCode code, String errMsg) { } } + public boolean isFinishRequestStale(List respPartitionInfos) { + return !respPartitionInfos.equals(partitionInfos); + } + public void setLatch(MarkedCountDownLatch latch) { this.latch = latch; } diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 1db7a109f55078..ecedf0ee1afad5 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -72,6 +72,8 @@ struct TFinishTaskRequest { 17: optional map succ_tablets 18: optional map table_id_to_delta_num_rows 19: optional map> table_id_to_tablet_id_to_delta_num_rows + // for Cloud mow table only, used by FE to check if the response is for the latest request + 20: optional list resp_partitions; } struct TTablet { diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out new file mode 100644 index 00000000000000..09882a909b391b --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 + +-- !sql -- +1 1 1 +2 2 2 +3 3 3 + +-- !sql -- +1 999 999 +2 888 888 +3 3 3 + diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out new file mode 100644 index 00000000000000..6fd2178fd94ac9 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 + +-- !sql -- +1 666 666 +2 555 555 +3 3 3 + +-- !sql -- +1 999 999 +2 888 888 +3 3 3 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy new file mode 100644 index 00000000000000..8f4fa45700b81f --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_cloud_mow_stale_resp_load_compaction_conflict", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + def customFeConfig = [ + delete_bitmap_lock_expiration_seconds : 10, + calculate_delete_bitmap_task_timeout_seconds : 15, + ] + + setFeConfigTemporary(customFeConfig) { + + def table1 = "test_cloud_mow_stale_resp_load_compaction_conflict" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1);" + sql "insert into ${table1} values(2,2,2);" + sql "insert into ${table1} values(3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the first load + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block", [wait_token: "token1"]) + + // the first load + t1 = Thread.start { + sql "insert into ${table1} values(1,999,999),(2,888,888);" + } + + // wait util the first load's delete bitmap update lock expired + // to ensure that the second load can take the delete bitmap update lock + // Config.delete_bitmap_lock_expiration_seconds = 10s + Thread.sleep(11 * 1000) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + order_qt_sql "select * from ${table1};" + + + // keep waiting util the delete bitmap calculation timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s) + // and the coordinator BE will retry to commit the first load's txn + Thread.sleep(15 * 1000) + + // let the first partial update load finish + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + t1.join() + + Thread.sleep(1000) + + order_qt_sql "select * from ${table1};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${table1};" + } +} diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy new file mode 100644 index 00000000000000..377ff70cf2101d --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_stale_resp_load_load_conflict", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + def customFeConfig = [ + delete_bitmap_lock_expiration_seconds : 10, + calculate_delete_bitmap_task_timeout_seconds : 15, + ] + + setFeConfigTemporary(customFeConfig) { + + def table1 = "test_cloud_mow_stale_resp_load_load_conflict" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1);" + sql "insert into ${table1} values(2,2,2);" + sql "insert into ${table1} values(3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the first load + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token1"]) + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block", [wait_token: "token1"]) + + // the first load + t1 = Thread.start { + sql "insert into ${table1} values(1,999,999),(2,888,888);" + } + + // wait util the first load's delete bitmap update lock expired + // to ensure that the second load can take the delete bitmap update lock + // Config.delete_bitmap_lock_expiration_seconds = 10s + Thread.sleep(11 * 1000) + + // the second load + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [token: "token2"]) + Thread.sleep(200) + + sql "insert into ${table1}(k1,c1,c2) values(1,666,666),(2,555,555);" + + order_qt_sql "select * from ${table1};" + + + // keep waiting util the delete bitmap calculation timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s) + // and the coordinator BE will retry to commit the first load's txn + Thread.sleep(15 * 1000) + + // let the first partial update load finish + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + t1.join() + + Thread.sleep(1000) + + order_qt_sql "select * from ${table1};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${table1};" + } +}