From 3d9adda4ae4a6ca4387e339e530e26dadeb9cb6c Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Tue, 11 Feb 2025 12:20:41 +0800 Subject: [PATCH 1/5] [Fix](cloud-mow) Compaciton should release delete bitmap lock when commit or abort fail --- be/src/cloud/cloud_base_compaction.cpp | 16 +- be/src/cloud/cloud_base_compaction.h | 4 +- be/src/cloud/cloud_cumulative_compaction.cpp | 16 +- be/src/cloud/cloud_cumulative_compaction.h | 4 +- be/src/cloud/cloud_full_compaction.cpp | 17 +- be/src/cloud/cloud_full_compaction.h | 3 +- be/src/cloud/cloud_meta_mgr.cpp | 13 +- be/src/cloud/cloud_meta_mgr.h | 3 +- be/src/cloud/cloud_tablet.cpp | 2 +- be/src/olap/base_compaction.h | 2 + be/src/olap/cold_data_compaction.h | 1 + be/src/olap/compaction.cpp | 28 +- be/src/olap/compaction.h | 5 +- be/src/olap/cumulative_compaction.h | 2 + be/src/olap/full_compaction.h | 2 + be/src/olap/single_replica_compaction.h | 2 + .../test_compaction_fail_release_lock.groovy | 240 ++++++++++++++++++ 17 files changed, 319 insertions(+), 41 deletions(-) create mode 100644 regression-test/suites/compaction/test_compaction_fail_release_lock.groovy diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index d053214e964a78..d6836ee16e9c56 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -330,8 +330,7 @@ Status CloudBaseCompaction::modify_rowsets() { DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & - std::numeric_limits::max(); + int64_t initiator = this->initiator(); RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction( _input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(), _stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap, @@ -403,8 +402,8 @@ Status CloudBaseCompaction::modify_rowsets() { return Status::OK(); } -void CloudBaseCompaction::garbage_collection() { - CloudCompactionMixin::garbage_collection(); +Status CloudBaseCompaction::garbage_collection() { + RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection()); cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); idx->set_tablet_id(_tablet->tablet_id()); @@ -418,9 +417,7 @@ void CloudBaseCompaction::garbage_collection() { compaction_job->set_type(cloud::TabletCompactionJobPB::BASE); if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & - std::numeric_limits::max(); - compaction_job->set_delete_bitmap_lock_initiator(initiator); + compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); } auto st = _engine.meta_mgr().abort_tablet_job(job); if (!st.ok()) { @@ -429,6 +426,7 @@ void CloudBaseCompaction::garbage_collection() { .tag("tablet_id", _tablet->tablet_id()) .error(st); } + return st; } void CloudBaseCompaction::do_lease() { @@ -456,4 +454,8 @@ void CloudBaseCompaction::do_lease() { } } +int64_t CloudBaseCompaction::initiator() const { + return HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits::max(); +} + } // namespace doris diff --git a/be/src/cloud/cloud_base_compaction.h b/be/src/cloud/cloud_base_compaction.h index 4240458f21ba87..9a2e378c18f94c 100644 --- a/be/src/cloud/cloud_base_compaction.h +++ b/be/src/cloud/cloud_base_compaction.h @@ -42,7 +42,7 @@ class CloudBaseCompaction : public CloudCompactionMixin { Status modify_rowsets() override; - void garbage_collection() override; + Status garbage_collection() override; void _filter_input_rowset(); @@ -50,6 +50,8 @@ class CloudBaseCompaction : public CloudCompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; } + int64_t initiator() const override; + std::string _uuid; int64_t _input_segments = 0; int64_t _base_compaction_cnt = 0; diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index c7a82b322fb82a..eb201479e10bfd 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -284,8 +284,7 @@ Status CloudCumulativeCompaction::modify_rowsets() { }); DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; - int64_t initiator = - HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits::max(); + int64_t initiator = this->initiator(); if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction( @@ -420,8 +419,8 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() { return Status::OK(); } -void CloudCumulativeCompaction::garbage_collection() { - CloudCompactionMixin::garbage_collection(); +Status CloudCumulativeCompaction::garbage_collection() { + RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection()); cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); idx->set_tablet_id(_tablet->tablet_id()); @@ -435,9 +434,7 @@ void CloudCumulativeCompaction::garbage_collection() { compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE); if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & - std::numeric_limits::max(); - compaction_job->set_delete_bitmap_lock_initiator(initiator); + compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); } auto st = _engine.meta_mgr().abort_tablet_job(job); if (!st.ok()) { @@ -446,6 +443,7 @@ void CloudCumulativeCompaction::garbage_collection() { .tag("tablet_id", _tablet->tablet_id()) .error(st); } + return st; } Status CloudCumulativeCompaction::pick_rowsets_to_compact() { @@ -610,5 +608,9 @@ void CloudCumulativeCompaction::do_lease() { } } +int64_t CloudCumulativeCompaction::initiator() const { + return HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits::max(); +} + #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/cloud/cloud_cumulative_compaction.h b/be/src/cloud/cloud_cumulative_compaction.h index 87fc0b62c9c389..7d5853f866f1c4 100644 --- a/be/src/cloud/cloud_cumulative_compaction.h +++ b/be/src/cloud/cloud_cumulative_compaction.h @@ -44,7 +44,7 @@ class CloudCumulativeCompaction : public CloudCompactionMixin { Status modify_rowsets() override; - void garbage_collection() override; + Status garbage_collection() override; void update_cumulative_point(); @@ -52,6 +52,8 @@ class CloudCumulativeCompaction : public CloudCompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } + int64_t initiator() const override; + std::string _uuid; int64_t _input_segments = 0; int64_t _max_conflict_version = 0; diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index f983e57ebe082b..98857ebcdce22f 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -227,10 +227,8 @@ Status CloudFullCompaction::modify_rowsets() { DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = - boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits::max(); - RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(initiator)); - compaction_job->set_delete_bitmap_lock_initiator(initiator); + RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(this->initiator())); + compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); } cloud::FinishTabletJobResponse resp; @@ -271,7 +269,7 @@ Status CloudFullCompaction::modify_rowsets() { return Status::OK(); } -void CloudFullCompaction::garbage_collection() { +Status CloudFullCompaction::garbage_collection() { //file_cache_garbage_collection(); cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); @@ -286,9 +284,7 @@ void CloudFullCompaction::garbage_collection() { compaction_job->set_type(cloud::TabletCompactionJobPB::FULL); if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = - boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits::max(); - compaction_job->set_delete_bitmap_lock_initiator(initiator); + compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); } auto st = _engine.meta_mgr().abort_tablet_job(job); if (!st.ok()) { @@ -297,6 +293,7 @@ void CloudFullCompaction::garbage_collection() { .tag("tablet_id", _tablet->tablet_id()) .error(st); } + return st; } void CloudFullCompaction::do_lease() { @@ -392,4 +389,8 @@ Status CloudFullCompaction::_cloud_full_compaction_calc_delete_bitmap( return Status::OK(); } +int64_t CloudFullCompaction::initiator() const { + return boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits::max(); +} + } // namespace doris diff --git a/be/src/cloud/cloud_full_compaction.h b/be/src/cloud/cloud_full_compaction.h index 3cbc353f6ddf6c..cb2e9358884364 100644 --- a/be/src/cloud/cloud_full_compaction.h +++ b/be/src/cloud/cloud_full_compaction.h @@ -42,7 +42,8 @@ class CloudFullCompaction : public CloudCompactionMixin { std::string_view compaction_name() const override { return "CloudFullCompaction"; } Status modify_rowsets() override; - void garbage_collection() override; + Status garbage_collection() override; + int64_t initiator() const override; private: Status _cloud_full_compaction_update_delete_bitmap(int64_t initiator); diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 99c0805b9b92a3..1a43e17bff397b 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -1201,23 +1201,22 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in return st; } -Status CloudMetaMgr::remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, - int64_t initiator) { - VLOG_DEBUG << "remove_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id() - << ",lock_id:" << lock_id; +void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t tablet_id, int64_t lock_id, + int64_t initiator) { + LOG(INFO) << "remove_delete_bitmap_update_lock , tablet_id: " << tablet_id + << ",lock_id:" << lock_id << ",initiator:" << initiator; RemoveDeleteBitmapUpdateLockRequest req; RemoveDeleteBitmapUpdateLockResponse res; req.set_cloud_unique_id(config::cloud_unique_id); - req.set_tablet_id(tablet.tablet_id()); + req.set_tablet_id(tablet_id); req.set_lock_id(lock_id); req.set_initiator(initiator); auto st = retry_rpc("remove delete bitmap update lock", req, &res, &MetaService_Stub::remove_delete_bitmap_update_lock); if (!st.ok()) { - LOG(WARNING) << "remove delete bitmap update lock fail,tablet_id=" << tablet.tablet_id() + LOG(WARNING) << "remove delete bitmap update lock fail,tablet_id=" << tablet_id << " lock_id=" << lock_id << " st=" << st.to_string(); } - return st; } Status CloudMetaMgr::remove_old_version_delete_bitmap( diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index 913ef59489a1b3..ea5a06d172fd8c 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -103,8 +103,7 @@ class CloudMetaMgr { Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, int64_t initiator); - Status remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, - int64_t initiator); + void remove_delete_bitmap_update_lock(int64_t tablet_id, int64_t lock_id, int64_t initiator); Status remove_old_version_delete_bitmap( int64_t tablet_id, diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 6592f17038a829..7abd8568bcb5fd 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -36,6 +36,7 @@ #include "common/logging.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "olap/compaction.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" @@ -54,7 +55,6 @@ namespace doris { #include "common/compile_check_begin.h" using namespace ErrorCode; -static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1; static constexpr int LOAD_INITIATOR_ID = -1; CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta) diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h index 91e17e8ea5a3d5..f9ebbb181e8cc0 100644 --- a/be/src/olap/base_compaction.h +++ b/be/src/olap/base_compaction.h @@ -46,6 +46,8 @@ class BaseCompaction final : public CompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; } + int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } + // filter input rowset in some case: // 1. dup key without delete predicate void _filter_input_rowset(); diff --git a/be/src/olap/cold_data_compaction.h b/be/src/olap/cold_data_compaction.h index 33647ea8c63dc4..580d412820ce9e 100644 --- a/be/src/olap/cold_data_compaction.h +++ b/be/src/olap/cold_data_compaction.h @@ -36,6 +36,7 @@ class ColdDataCompaction final : public CompactionMixin { private: std::string_view compaction_name() const override { return "cold data compaction"; } ReaderType compaction_type() const override { return ReaderType::READER_COLD_DATA_COMPACTION; } + int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } Status construct_output_rowset_writer(RowsetWriterContext& ctx) override; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 15ad98784dd8aa..5c872c5873b6bb 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1434,7 +1434,17 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get())); // 4. modify rowsets in memory - RETURN_IF_ERROR(modify_rowsets()); + auto st = modify_rowsets(); + if (!st.ok()) { + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write() && + initiator() != INVALID_COMPACTION_INITIATOR_ID) { + //release delete bitmap lock + _engine.meta_mgr().remove_delete_bitmap_update_lock( + _tablet->tablet_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator()); + } + return st; + } return Status::OK(); } @@ -1442,8 +1452,15 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { Status CloudCompactionMixin::execute_compact() { TEST_INJECTION_POINT("Compaction::do_compaction"); int64_t permits = get_compaction_permits(); - HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(execute_compact_impl(permits), - [&](const doris::Exception& ex) { garbage_collection(); }); + HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( + execute_compact_impl(permits), [&](const doris::Exception& ex) { + auto st = garbage_collection(); + if (!st.ok() && initiator() != INVALID_COMPACTION_INITIATOR_ID) { + // release delete bitmap lock + _engine.meta_mgr().remove_delete_bitmap_update_lock( + _tablet->tablet_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator()); + } + }); _load_segment_to_cache(); return Status::OK(); } @@ -1488,9 +1505,9 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& return Status::OK(); } -void CloudCompactionMixin::garbage_collection() { +Status CloudCompactionMixin::garbage_collection() { if (!config::enable_file_cache) { - return; + return Status::OK(); } if (_output_rs_writer) { auto* beta_rowset_writer = dynamic_cast(_output_rs_writer.get()); @@ -1501,6 +1518,7 @@ void CloudCompactionMixin::garbage_collection() { file_cache->remove_if_cached_async(file_key); } } + return Status::OK(); } void CloudCompactionMixin::update_compaction_level() { diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 775dfd72da0a6f..5c1bde04794aa0 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -41,6 +41,8 @@ struct RowsetWriterContext; class StorageEngine; class CloudStorageEngine; +static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1; +static constexpr int64_t INVALID_COMPACTION_INITIATOR_ID = -100; // This class is a base class for compaction. // The entrance of this class is compact() // Any compaction should go through four procedures. @@ -63,6 +65,7 @@ class Compaction { virtual ReaderType compaction_type() const = 0; virtual std::string_view compaction_name() const = 0; + virtual int64_t initiator() const = 0; protected: Status merge_input_rowsets(); @@ -195,7 +198,7 @@ class CloudCompactionMixin : public Compaction { Status update_delete_bitmap() override; - virtual void garbage_collection(); + virtual Status garbage_collection(); CloudStorageEngine& _engine; diff --git a/be/src/olap/cumulative_compaction.h b/be/src/olap/cumulative_compaction.h index 276e3b3490311c..8ee4ec9cc11782 100644 --- a/be/src/olap/cumulative_compaction.h +++ b/be/src/olap/cumulative_compaction.h @@ -42,6 +42,8 @@ class CumulativeCompaction final : public CompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } + int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } + Status pick_rowsets_to_compact(); void find_longest_consecutive_version(std::vector* rowsets, diff --git a/be/src/olap/full_compaction.h b/be/src/olap/full_compaction.h index cb11ac70aa331f..204c7f2b1d4a51 100644 --- a/be/src/olap/full_compaction.h +++ b/be/src/olap/full_compaction.h @@ -47,6 +47,8 @@ class FullCompaction final : public CompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; } + int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } + Status _check_all_version(const std::vector& rowsets); Status _full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset, RowsetWriter* rowset_writer); diff --git a/be/src/olap/single_replica_compaction.h b/be/src/olap/single_replica_compaction.h index 10ec65ec3f0570..c2253b7c037ff3 100644 --- a/be/src/olap/single_replica_compaction.h +++ b/be/src/olap/single_replica_compaction.h @@ -47,6 +47,8 @@ class SingleReplicaCompaction final : public CompactionMixin { : ReaderType::READER_BASE_COMPACTION; } + int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } + private: Status _do_single_replica_compaction(); Status _do_single_replica_compaction_impl(); diff --git a/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy b/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy new file mode 100644 index 00000000000000..84beb50278fd4a --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_compaction_fail_release_lock", "nonConcurrent") { + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id -> + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { be_host, be_http_port, tablet_id -> + boolean running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def testTable = "test_compaction_fail_release_lock" + def timeout = 10000 + sql """ DROP TABLE IF EXISTS ${testTable}""" + def testTableDDL = """ + create table ${testTable} + ( + `plan_id` bigint(20) NOT NULL, + `target_id` int(20) NOT NULL, + `target_name` varchar(255) NOT NULL + ) + ENGINE=OLAP + UNIQUE KEY(`plan_id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`plan_id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + sql testTableDDL + sql "sync" + + // store the original value + get_be_param("compaction_promotion_version_count") + set_be_param("compaction_promotion_version_count", "5") + + try { + // 1. test normal + sql "sync" + sql """ INSERT INTO ${testTable} VALUES (0,0,'1'),(1,1,'1'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'2'),(2,2,'2'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'3'),(3,3,'3'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'4'),(4,4,'4'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'5'),(5,5,'5'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'6'),(6,6,'6'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'7'),(7,7,'7'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'8'),(8,8,'8'); """ + + qt_sql "select * from ${testTable} order by plan_id" + + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.commit_job_failed") + + // trigger cu compaction, compaction will commit fail + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + String trigger_backend_id = tablet.BackendId + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + + assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], + "cumulative", tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + } + + def now = System.currentTimeMillis() + + // insert will done before timeout + sql """ INSERT INTO ${testTable} VALUES (0,0,'9'),(1,9,'9'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'10'),(1,10,'10'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'11'),(1,11,'11'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'12'),(1,12,'12'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'13'),(1,13,'13'); """ + + def time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff <= timeout, "wait_for_insert_into_values timeout") + + qt_sql "select * from ${testTable} order by plan_id" + + GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.commit_job_failed") + + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.throw_exception") + + // trigger cu compaction, compaction will abort + tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + String trigger_backend_id = tablet.BackendId + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + + assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], + "cumulative", tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + } + + // insert will done before timeout + + now = System.currentTimeMillis() + + sql """ INSERT INTO ${testTable} VALUES (0,0,'14'),(1,19,'19'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'15'),(1,20,'20'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'16'),(1,21,'21'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'17'),(1,22,'22'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'18'),(1,23,'23'); """ + + time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff <= timeout, "wait_for_insert_into_values timeout") + + qt_sql "select * from ${testTable} order by plan_id" + + GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.throw_exception") + } finally { + reset_be_param("compaction_promotion_version_count") + GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.commit_job_failed") + GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.throw_exception") + } + +} From 00c9e9d94fcb822ab584d1aa55e4c9745e030915 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Tue, 11 Feb 2025 15:50:45 +0800 Subject: [PATCH 2/5] edit --- be/src/cloud/cloud_cumulative_compaction.cpp | 14 ++++ be/src/cloud/cloud_meta_mgr.cpp | 14 ++-- be/src/cloud/cloud_meta_mgr.h | 3 +- be/src/cloud/config.cpp | 2 +- be/src/cloud/config.h | 2 +- be/src/olap/compaction.cpp | 15 +--- .../test_compaction_fail_release_lock.out | 23 +++++++ .../test_compaction_fail_release_lock.groovy | 68 ++++--------------- 8 files changed, 65 insertions(+), 76 deletions(-) create mode 100644 regression-test/data/compaction/test_compaction_fail_release_lock.out diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index eb201479e10bfd..d0c4d8ce14ea74 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -304,6 +304,13 @@ Status CloudCumulativeCompaction::modify_rowsets() { compaction_job->set_delete_bitmap_lock_initiator(initiator); } + DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", { + LOG(INFO) << "CumulativeCompaction.modify_rowsets.trigger_abort_job_failed for tablet_id" + << cloud_tablet()->tablet_id(); + return Status::InternalError( + "CumulativeCompaction.modify_rowsets.trigger_abort_job_failed for tablet_id {}", + cloud_tablet()->tablet_id()); + }); cloud::FinishTabletJobResponse resp; auto st = _engine.meta_mgr().commit_tablet_job(job, &resp); if (resp.has_alter_version()) { @@ -436,6 +443,13 @@ Status CloudCumulativeCompaction::garbage_collection() { _tablet->enable_unique_key_merge_on_write()) { compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); } + DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", { + LOG(INFO) << "CumulativeCompaction.modify_rowsets.abort_job_failed for tablet_id" + << cloud_tablet()->tablet_id(); + return Status::InternalError( + "CumulativeCompaction.modify_rowsets.abort_job_failed for tablet_id {}", + cloud_tablet()->tablet_id()); + }); auto st = _engine.meta_mgr().abort_tablet_job(job); if (!st.ok()) { LOG_WARNING("failed to abort compaction job") diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 1a43e17bff397b..30e29d74f0402a 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -1201,21 +1201,23 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in return st; } -void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t tablet_id, int64_t lock_id, - int64_t initiator) { - LOG(INFO) << "remove_delete_bitmap_update_lock , tablet_id: " << tablet_id - << ",lock_id:" << lock_id << ",initiator:" << initiator; +void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, + int64_t initiator, int64_t tablet_id) { + LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id + << ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id; RemoveDeleteBitmapUpdateLockRequest req; RemoveDeleteBitmapUpdateLockResponse res; req.set_cloud_unique_id(config::cloud_unique_id); + req.set_table_id(table_id); req.set_tablet_id(tablet_id); req.set_lock_id(lock_id); req.set_initiator(initiator); auto st = retry_rpc("remove delete bitmap update lock", req, &res, &MetaService_Stub::remove_delete_bitmap_update_lock); if (!st.ok()) { - LOG(WARNING) << "remove delete bitmap update lock fail,tablet_id=" << tablet_id - << " lock_id=" << lock_id << " st=" << st.to_string(); + LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id + << ",tablet_id=" << tablet_id << ",lock_id=" << lock_id + << ",st=" << st.to_string(); } } diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index ea5a06d172fd8c..f0a1b1a664887e 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -103,7 +103,8 @@ class CloudMetaMgr { Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, int64_t initiator); - void remove_delete_bitmap_update_lock(int64_t tablet_id, int64_t lock_id, int64_t initiator); + void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, int64_t initiator, + int64_t tablet_id); Status remove_old_version_delete_bitmap( int64_t tablet_id, diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 141c7a9b170eb1..54d8424345a723 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -66,7 +66,7 @@ DEFINE_mInt32(sync_load_for_tablets_thread, "32"); DEFINE_mBool(enable_new_tablet_do_compaction, "false"); -DEFINE_Int32(delete_bitmap_lock_expiration_seconds, "10"); +DEFINE_mInt32(delete_bitmap_lock_expiration_seconds, "10"); DEFINE_Bool(enable_cloud_txn_lazy_commit, "false"); diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 50f058bf8b0c79..f79038662ef89d 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -98,7 +98,7 @@ DECLARE_mBool(save_load_error_log_to_s3); // the theads which sync the datas which loaded in other clusters DECLARE_mInt32(sync_load_for_tablets_thread); -DECLARE_Int32(delete_bitmap_lock_expiration_seconds); +DECLARE_mInt32(delete_bitmap_lock_expiration_seconds); // enable large txn lazy commit in meta-service `commit_txn` DECLARE_mBool(enable_cloud_txn_lazy_commit); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 5c872c5873b6bb..fd35353fca272c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1434,17 +1434,7 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get())); // 4. modify rowsets in memory - auto st = modify_rowsets(); - if (!st.ok()) { - if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write() && - initiator() != INVALID_COMPACTION_INITIATOR_ID) { - //release delete bitmap lock - _engine.meta_mgr().remove_delete_bitmap_update_lock( - _tablet->tablet_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator()); - } - return st; - } + RETURN_IF_ERROR(modify_rowsets()); return Status::OK(); } @@ -1458,7 +1448,8 @@ Status CloudCompactionMixin::execute_compact() { if (!st.ok() && initiator() != INVALID_COMPACTION_INITIATOR_ID) { // release delete bitmap lock _engine.meta_mgr().remove_delete_bitmap_update_lock( - _tablet->tablet_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator()); + _tablet->table_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator(), + _tablet->tablet_id()); } }); _load_segment_to_cache(); diff --git a/regression-test/data/compaction/test_compaction_fail_release_lock.out b/regression-test/data/compaction/test_compaction_fail_release_lock.out new file mode 100644 index 00000000000000..dcbfab2d3539eb --- /dev/null +++ b/regression-test/data/compaction/test_compaction_fail_release_lock.out @@ -0,0 +1,23 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 8 +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 + +-- !sql -- +0 0 13 +1 13 13 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 + diff --git a/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy b/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy index 84beb50278fd4a..e8763446b9d5d0 100644 --- a/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy +++ b/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -import org.codehaus.groovy.runtime.IOGroovyMethods - suite("test_compaction_fail_release_lock", "nonConcurrent") { def backendId_to_backendIP = [:] def backendId_to_backendHttpPort = [:] @@ -120,10 +118,15 @@ suite("test_compaction_fail_release_lock", "nonConcurrent") { } while (running) } - def testTable = "test_compaction_fail_release_lock" - def timeout = 10000 - sql """ DROP TABLE IF EXISTS ${testTable}""" - def testTableDDL = """ + // store the original value + get_be_param("delete_bitmap_lock_expiration_seconds") + set_be_param("delete_bitmap_lock_expiration_seconds", "60") + + try { + def testTable = "test_compaction_fail_release_lock" + def timeout = 10000 + sql """ DROP TABLE IF EXISTS ${testTable}""" + def testTableDDL = """ create table ${testTable} ( `plan_id` bigint(20) NOT NULL, @@ -140,15 +143,8 @@ suite("test_compaction_fail_release_lock", "nonConcurrent") { "disable_auto_compaction" = "true" ); """ - sql testTableDDL - sql "sync" + sql testTableDDL - // store the original value - get_be_param("compaction_promotion_version_count") - set_be_param("compaction_promotion_version_count", "5") - - try { - // 1. test normal sql "sync" sql """ INSERT INTO ${testTable} VALUES (0,0,'1'),(1,1,'1'); """ sql """ INSERT INTO ${testTable} VALUES (0,0,'2'),(2,2,'2'); """ @@ -161,7 +157,7 @@ suite("test_compaction_fail_release_lock", "nonConcurrent") { qt_sql "select * from ${testTable} order by plan_id" - GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.commit_job_failed") + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed") // trigger cu compaction, compaction will commit fail def tablets = sql_return_maparray """ show tablets from ${testTable}; """ @@ -194,47 +190,9 @@ suite("test_compaction_fail_release_lock", "nonConcurrent") { qt_sql "select * from ${testTable} order by plan_id" - GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.commit_job_failed") - - GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.throw_exception") - - // trigger cu compaction, compaction will abort - tablets = sql_return_maparray """ show tablets from ${testTable}; """ - logger.info("tablets: " + tablets) - for (def tablet in tablets) { - String tablet_id = tablet.TabletId - def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ - logger.info("tablet: " + tablet_info) - String trigger_backend_id = tablet.BackendId - getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); - - assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], - "cumulative", tablet_id).contains("Success")); - waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) - getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); - } - - // insert will done before timeout - - now = System.currentTimeMillis() - - sql """ INSERT INTO ${testTable} VALUES (0,0,'14'),(1,19,'19'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'15'),(1,20,'20'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'16'),(1,21,'21'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'17'),(1,22,'22'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'18'),(1,23,'23'); """ - - time_diff = System.currentTimeMillis() - now - logger.info("time_diff:" + time_diff) - assertTrue(time_diff <= timeout, "wait_for_insert_into_values timeout") - - qt_sql "select * from ${testTable} order by plan_id" - - GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.throw_exception") } finally { - reset_be_param("compaction_promotion_version_count") - GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.commit_job_failed") - GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.throw_exception") + reset_be_param("delete_bitmap_lock_expiration_seconds") + GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed") } } From 883f55352679e9335c5712a09a717baeef1256a7 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Tue, 11 Feb 2025 18:55:45 +0800 Subject: [PATCH 3/5] edit ms --- cloud/src/meta-service/meta_service.cpp | 65 +++++++++++++++------ cloud/src/meta-service/meta_service_job.cpp | 4 +- 2 files changed, 48 insertions(+), 21 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index f0d4b9f4861a7d..ef4e1877ca4d0b 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1747,11 +1747,11 @@ void MetaServiceImpl::get_tablet_stats(::google::protobuf::RpcController* contro } static bool check_delete_bitmap_lock(MetaServiceCode& code, std::string& msg, std::stringstream& ss, - std::unique_ptr& txn, std::string& instance_id, - int64_t table_id, int64_t lock_id, int64_t lock_initiator) { - std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + std::unique_ptr& txn, int64_t table_id, + int64_t lock_id, int64_t lock_initiator, std::string& lock_key, + DeleteBitmapUpdateLockPB& lock_info) { std::string lock_val; - DeleteBitmapUpdateLockPB lock_info; + LOG(INFO) << "check_delete_bitmap_lock, table_id=" << table_id << " key=" << hex(lock_key); auto err = txn->get(lock_key, &lock_val); TEST_SYNC_POINT_CALLBACK("check_delete_bitmap_lock.inject_get_lock_key_err", &err); if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { @@ -1865,8 +1865,10 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont bool unlock = request->has_unlock() ? request->unlock() : false; if (!unlock) { // 1. Check whether the lock expires - if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(), - request->initiator())) { + std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + DeleteBitmapUpdateLockPB lock_info; + if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info)) { LOG(WARNING) << "failed to check delete bitmap lock, table_id=" << table_id << " request lock_id=" << request->lock_id() << " request initiator=" << request->initiator() << " msg " << msg; @@ -1961,8 +1963,11 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont return; } if (!unlock) { - if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, - request->lock_id(), request->initiator())) { + std::string lock_key = + meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + DeleteBitmapUpdateLockPB lock_info; + if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info)) { LOG(WARNING) << "failed to check delete bitmap lock, table_id=" << table_id << " request lock_id=" << request->lock_id() << " request initiator=" << request->initiator() << " msg " << msg; @@ -2344,8 +2349,9 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl LOG(INFO) << fmt::format("tablet_idxes.size()={}, read tablet compaction cnts cost={} ms", request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000); - if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(), - request->initiator())) { + DeleteBitmapUpdateLockPB lock_info_tmp; + if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info_tmp)) { LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats, table_id=" << table_id << " request lock_id=" << request->lock_id() << " request initiator=" << request->initiator() << " code=" << code @@ -2381,17 +2387,42 @@ void MetaServiceImpl::remove_delete_bitmap_update_lock( msg = "failed to init txn"; return; } - if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, request->table_id(), - request->lock_id(), request->initiator())) { + std::string lock_key = + meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1}); + std::string lock_val; + DeleteBitmapUpdateLockPB lock_info; + if (!check_delete_bitmap_lock(code, msg, ss, txn, request->table_id(), request->lock_id(), + request->initiator(), lock_key, lock_info)) { LOG(WARNING) << "failed to check delete bitmap tablet lock" << " table_id=" << request->table_id() << " tablet_id=" << request->tablet_id() << " request lock_id=" << request->lock_id() << " request initiator=" << request->initiator() << " msg " << msg; return; } - std::string lock_key = - meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1}); - txn->remove(lock_key); + auto initiators = lock_info.mutable_initiators(); + for (auto iter = initiators->begin(); iter != initiators->end(); iter++) { + if (*iter == request->initiator()) { + initiators->erase(iter); + break; + } + } + if (initiators->empty()) { + LOG(INFO) << "remove delete bitmap lock, table_id=" << request->table_id() + << " lock_id=" << request->lock_id() << " key=" << hex(lock_key); + txn->remove(lock_key); + } else { + lock_info.SerializeToString(&lock_val); + if (lock_val.empty()) { + LOG(WARNING) << "failed to seiralize lock_info, table_id=" << request->table_id() + << " key=" << hex(lock_key); + return; + } + LOG(INFO) << "remove delete bitmap lock initiator, table_id=" << request->table_id() + << ", key=" << hex(lock_key) << " lock_id=" << request->lock_id() + << " initiator=" << request->initiator() + << " initiators_size=" << lock_info.initiators_size(); + txn->put(lock_key, lock_val); + } err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -2399,10 +2430,6 @@ void MetaServiceImpl::remove_delete_bitmap_update_lock( msg = ss.str(); return; } - - LOG(INFO) << "remove delete bitmap table lock table_id=" << request->table_id() - << " tablet_id=" << request->tablet_id() << " lock_id=" << request->lock_id() - << ", key=" << hex(lock_key) << ", initiator=" << request->initiator(); } void MetaServiceImpl::remove_delete_bitmap(google::protobuf::RpcController* controller, diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index 173c6834b5f1e6..5299b85f41d9a9 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -458,7 +458,7 @@ static bool check_and_remove_delete_bitmap_update_lock(MetaServiceCode& code, st std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); std::string lock_val; TxnErrorCode err = txn->get(lock_key, &lock_val); - LOG(INFO) << "get delete bitmap update lock info, table_id=" << table_id + LOG(INFO) << "get remove delete bitmap update lock info, table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; if (err != TxnErrorCode::TXN_OK) { ss << "failed to get delete bitmap update lock key, instance_id=" << instance_id @@ -520,7 +520,7 @@ static void remove_delete_bitmap_update_lock(std::unique_ptr& txn, std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); std::string lock_val; TxnErrorCode err = txn->get(lock_key, &lock_val); - LOG(INFO) << "get delete bitmap update lock info, table_id=" << table_id + LOG(INFO) << "get remove delete bitmap update lock info, table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get delete bitmap update lock key, instance_id=" << instance_id From 95f610851e958d2d67c07fc2e2e00f375efb79b2 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Wed, 12 Feb 2025 09:53:26 +0800 Subject: [PATCH 4/5] edit --- be/src/olap/base_compaction.h | 2 -- be/src/olap/cold_data_compaction.h | 1 - be/src/olap/compaction.h | 2 +- be/src/olap/cumulative_compaction.h | 2 -- be/src/olap/full_compaction.h | 2 -- be/src/olap/single_replica_compaction.h | 2 -- 6 files changed, 1 insertion(+), 10 deletions(-) diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h index f9ebbb181e8cc0..91e17e8ea5a3d5 100644 --- a/be/src/olap/base_compaction.h +++ b/be/src/olap/base_compaction.h @@ -46,8 +46,6 @@ class BaseCompaction final : public CompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; } - int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } - // filter input rowset in some case: // 1. dup key without delete predicate void _filter_input_rowset(); diff --git a/be/src/olap/cold_data_compaction.h b/be/src/olap/cold_data_compaction.h index 580d412820ce9e..33647ea8c63dc4 100644 --- a/be/src/olap/cold_data_compaction.h +++ b/be/src/olap/cold_data_compaction.h @@ -36,7 +36,6 @@ class ColdDataCompaction final : public CompactionMixin { private: std::string_view compaction_name() const override { return "cold data compaction"; } ReaderType compaction_type() const override { return ReaderType::READER_COLD_DATA_COMPACTION; } - int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } Status construct_output_rowset_writer(RowsetWriterContext& ctx) override; diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 5c1bde04794aa0..9938311a777b17 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -65,7 +65,7 @@ class Compaction { virtual ReaderType compaction_type() const = 0; virtual std::string_view compaction_name() const = 0; - virtual int64_t initiator() const = 0; + virtual int64_t initiator() const { return INVALID_COMPACTION_INITIATOR_ID; } protected: Status merge_input_rowsets(); diff --git a/be/src/olap/cumulative_compaction.h b/be/src/olap/cumulative_compaction.h index 8ee4ec9cc11782..276e3b3490311c 100644 --- a/be/src/olap/cumulative_compaction.h +++ b/be/src/olap/cumulative_compaction.h @@ -42,8 +42,6 @@ class CumulativeCompaction final : public CompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } - int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } - Status pick_rowsets_to_compact(); void find_longest_consecutive_version(std::vector* rowsets, diff --git a/be/src/olap/full_compaction.h b/be/src/olap/full_compaction.h index 204c7f2b1d4a51..cb11ac70aa331f 100644 --- a/be/src/olap/full_compaction.h +++ b/be/src/olap/full_compaction.h @@ -47,8 +47,6 @@ class FullCompaction final : public CompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; } - int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } - Status _check_all_version(const std::vector& rowsets); Status _full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset, RowsetWriter* rowset_writer); diff --git a/be/src/olap/single_replica_compaction.h b/be/src/olap/single_replica_compaction.h index c2253b7c037ff3..10ec65ec3f0570 100644 --- a/be/src/olap/single_replica_compaction.h +++ b/be/src/olap/single_replica_compaction.h @@ -47,8 +47,6 @@ class SingleReplicaCompaction final : public CompactionMixin { : ReaderType::READER_BASE_COMPACTION; } - int64_t initiator() const override { return INVALID_COMPACTION_INITIATOR_ID; } - private: Status _do_single_replica_compaction(); Status _do_single_replica_compaction_impl(); From c24be19faeb20d21108f299b3afa6f4016f57b7f Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Wed, 12 Feb 2025 18:20:57 +0800 Subject: [PATCH 5/5] edit --- be/src/cloud/cloud_base_compaction.cpp | 11 +---------- be/src/cloud/cloud_base_compaction.h | 3 --- be/src/cloud/cloud_cumulative_compaction.cpp | 11 +---------- be/src/cloud/cloud_cumulative_compaction.h | 3 --- be/src/cloud/cloud_full_compaction.cpp | 11 +---------- be/src/cloud/cloud_full_compaction.h | 2 -- be/src/olap/compaction.cpp | 16 ++++++++++++++-- be/src/olap/compaction.h | 7 ++++++- cloud/src/meta-service/meta_service.cpp | 8 +++++++- 9 files changed, 30 insertions(+), 42 deletions(-) diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index d6836ee16e9c56..ce105d89b3ffa2 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -38,12 +38,7 @@ bvar::Adder base_output_size("base_compaction", "output_size"); CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) : CloudCompactionMixin(engine, tablet, - "BaseCompaction:" + std::to_string(tablet->tablet_id())) { - auto uuid = UUIDGenerator::instance()->next_uuid(); - std::stringstream ss; - ss << uuid; - _uuid = ss.str(); -} + "BaseCompaction:" + std::to_string(tablet->tablet_id())) {} CloudBaseCompaction::~CloudBaseCompaction() = default; @@ -454,8 +449,4 @@ void CloudBaseCompaction::do_lease() { } } -int64_t CloudBaseCompaction::initiator() const { - return HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits::max(); -} - } // namespace doris diff --git a/be/src/cloud/cloud_base_compaction.h b/be/src/cloud/cloud_base_compaction.h index 9a2e378c18f94c..b9f52922b8e29f 100644 --- a/be/src/cloud/cloud_base_compaction.h +++ b/be/src/cloud/cloud_base_compaction.h @@ -50,9 +50,6 @@ class CloudBaseCompaction : public CloudCompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; } - int64_t initiator() const override; - - std::string _uuid; int64_t _input_segments = 0; int64_t _base_compaction_cnt = 0; int64_t _cumulative_compaction_cnt = 0; diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index d0c4d8ce14ea74..d2a3af3ec1ba52 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -41,12 +41,7 @@ bvar::Adder cumu_output_size("cumu_compaction", "output_size"); CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) : CloudCompactionMixin(engine, tablet, - "BaseCompaction:" + std::to_string(tablet->tablet_id())) { - auto uuid = UUIDGenerator::instance()->next_uuid(); - std::stringstream ss; - ss << uuid; - _uuid = ss.str(); -} + "BaseCompaction:" + std::to_string(tablet->tablet_id())) {} CloudCumulativeCompaction::~CloudCumulativeCompaction() = default; @@ -622,9 +617,5 @@ void CloudCumulativeCompaction::do_lease() { } } -int64_t CloudCumulativeCompaction::initiator() const { - return HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits::max(); -} - #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/cloud/cloud_cumulative_compaction.h b/be/src/cloud/cloud_cumulative_compaction.h index 7d5853f866f1c4..1e50cd93215d4b 100644 --- a/be/src/cloud/cloud_cumulative_compaction.h +++ b/be/src/cloud/cloud_cumulative_compaction.h @@ -52,9 +52,6 @@ class CloudCumulativeCompaction : public CloudCompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } - int64_t initiator() const override; - - std::string _uuid; int64_t _input_segments = 0; int64_t _max_conflict_version = 0; // Snapshot values when pick input rowsets diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 98857ebcdce22f..b7dde4266df2b6 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -42,12 +42,7 @@ bvar::Adder full_output_size("full_compaction", "output_size"); CloudFullCompaction::CloudFullCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) : CloudCompactionMixin(engine, tablet, - "BaseCompaction:" + std::to_string(tablet->tablet_id())) { - auto uuid = UUIDGenerator::instance()->next_uuid(); - std::stringstream ss; - ss << uuid; - _uuid = ss.str(); -} + "BaseCompaction:" + std::to_string(tablet->tablet_id())) {} CloudFullCompaction::~CloudFullCompaction() = default; @@ -389,8 +384,4 @@ Status CloudFullCompaction::_cloud_full_compaction_calc_delete_bitmap( return Status::OK(); } -int64_t CloudFullCompaction::initiator() const { - return boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits::max(); -} - } // namespace doris diff --git a/be/src/cloud/cloud_full_compaction.h b/be/src/cloud/cloud_full_compaction.h index cb2e9358884364..882fd75292674e 100644 --- a/be/src/cloud/cloud_full_compaction.h +++ b/be/src/cloud/cloud_full_compaction.h @@ -43,7 +43,6 @@ class CloudFullCompaction : public CloudCompactionMixin { Status modify_rowsets() override; Status garbage_collection() override; - int64_t initiator() const override; private: Status _cloud_full_compaction_update_delete_bitmap(int64_t initiator); @@ -53,7 +52,6 @@ class CloudFullCompaction : public CloudCompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; } - std::string _uuid; int64_t _input_segments = 0; // Snapshot values when pick input rowsets int64_t _base_compaction_cnt = 0; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index fd35353fca272c..efd3a104aea20e 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1404,7 +1404,12 @@ int64_t CloudCompactionMixin::get_compaction_permits() { CloudCompactionMixin::CloudCompactionMixin(CloudStorageEngine& engine, CloudTabletSPtr tablet, const std::string& label) - : Compaction(tablet, label), _engine(engine) {} + : Compaction(tablet, label), _engine(engine) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { OlapStopWatch watch; @@ -1439,6 +1444,10 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { return Status::OK(); } +int64_t CloudCompactionMixin::initiator() const { + return HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits::max(); +} + Status CloudCompactionMixin::execute_compact() { TEST_INJECTION_POINT("Compaction::do_compaction"); int64_t permits = get_compaction_permits(); @@ -1446,7 +1455,10 @@ Status CloudCompactionMixin::execute_compact() { execute_compact_impl(permits), [&](const doris::Exception& ex) { auto st = garbage_collection(); if (!st.ok() && initiator() != INVALID_COMPACTION_INITIATOR_ID) { - // release delete bitmap lock + // if compaction fail, be will try to abort compaction, and delete bitmap lock + // will release if abort job successfully, but if abort failed, delete bitmap + // lock will not release, in this situation, be need to send this rpc to ms + // to try to release delete bitmap lock. _engine.meta_mgr().remove_delete_bitmap_update_lock( _tablet->table_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator(), _tablet->tablet_id()); diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 9938311a777b17..f656adf5175248 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -65,7 +65,6 @@ class Compaction { virtual ReaderType compaction_type() const = 0; virtual std::string_view compaction_name() const = 0; - virtual int64_t initiator() const { return INVALID_COMPACTION_INITIATOR_ID; } protected: Status merge_input_rowsets(); @@ -153,6 +152,8 @@ class CompactionMixin : public Compaction { int64_t get_compaction_permits(); + int64_t initiator() const { return INVALID_COMPACTION_INITIATOR_ID; } + protected: // Convert `_tablet` from `BaseTablet` to `Tablet` Tablet* tablet(); @@ -193,6 +194,8 @@ class CloudCompactionMixin : public Compaction { Status execute_compact() override; + int64_t initiator() const; + protected: CloudTablet* cloud_tablet() { return static_cast(_tablet.get()); } @@ -202,6 +205,8 @@ class CloudCompactionMixin : public Compaction { CloudStorageEngine& _engine; + std::string _uuid; + int64_t _expiration = 0; private: diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index ef4e1877ca4d0b..bc8af94496a0cf 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -2399,14 +2399,20 @@ void MetaServiceImpl::remove_delete_bitmap_update_lock( << " request initiator=" << request->initiator() << " msg " << msg; return; } + bool modify_initiators = false; auto initiators = lock_info.mutable_initiators(); for (auto iter = initiators->begin(); iter != initiators->end(); iter++) { if (*iter == request->initiator()) { initiators->erase(iter); + modify_initiators = true; break; } } - if (initiators->empty()) { + if (!modify_initiators) { + LOG(INFO) << "initiators don't have initiator=" << request->initiator() + << ",initiators_size=" << lock_info.initiators_size() << ",just return"; + return; + } else if (initiators->empty()) { LOG(INFO) << "remove delete bitmap lock, table_id=" << request->table_id() << " lock_id=" << request->lock_id() << " key=" << hex(lock_key); txn->remove(lock_key);