diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 6dff0a28dbcf8f..34746cb04c7cf7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1303,6 +1303,8 @@ DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60"); DEFINE_mInt32(snappy_compression_block_size, "262144"); DEFINE_mInt32(lz4_compression_block_size, "262144"); +DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false"); + DEFINE_mBool(enable_pipeline_task_leakage_detect, "false"); // clang-format off diff --git a/be/src/common/config.h b/be/src/common/config.h index a282abb37ee9f3..de159cba733b35 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1385,6 +1385,8 @@ DECLARE_mInt64(pipeline_task_leakage_detect_period_secs); DECLARE_mInt32(snappy_compression_block_size); DECLARE_mInt32(lz4_compression_block_size); +DECLARE_mBool(enable_delete_bitmap_merge_on_compaction); + DECLARE_mBool(enable_pipeline_task_leakage_detect); #ifdef BE_TEST diff --git a/be/src/http/action/delete_bitmap_action.cpp b/be/src/http/action/delete_bitmap_action.cpp new file mode 100644 index 00000000000000..d6a951ec24720d --- /dev/null +++ b/be/src/http/action/delete_bitmap_action.cpp @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "delete_bitmap_action.h" + +#include +#include +#include +#include +#include + +#include // IWYU pragma: keep +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/logging.h" +#include "common/status.h" +#include "gutil/strings/substitute.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/olap_define.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "util/doris_metrics.h" +#include "util/stopwatch.hpp" + +namespace doris { +using namespace ErrorCode; + +namespace { + +constexpr std::string_view HEADER_JSON = "application/json"; + +} // namespace + +DeleteBitmapAction::DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, + TPrivilegeHier::type hier, TPrivilegeType::type ptype) + : HttpHandlerWithAuth(exec_env, hier, ptype), _delete_bitmap_action_type(ctype) {} + +static Status _check_param(HttpRequest* req, uint64_t* tablet_id) { + const auto& req_tablet_id = req->param(TABLET_ID_KEY); + if (req_tablet_id.empty()) { + return Status::InternalError("tablet id is empty!"); + } + try { + *tablet_id = std::stoull(req_tablet_id); + } catch (const std::exception& e) { + return Status::InternalError("convert tablet_id failed, {}", e.what()); + } + return Status::OK(); +} + +Status DeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* req, + std::string* json_result) { + uint64_t tablet_id = 0; + // check & retrieve tablet_id from req if it contains + RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed"); + if (tablet_id == 0) { + return Status::InternalError("check param failed: missing tablet_id"); + } + + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (tablet == nullptr) { + return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); + } + + auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); + auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality(); + auto size = tablet->tablet_meta()->delete_bitmap().get_size(); + + rapidjson::Document root; + root.SetObject(); + root.AddMember("delete_bitmap_count", count, root.GetAllocator()); + root.AddMember("cardinality", cardinality, root.GetAllocator()); + root.AddMember("size", size, root.GetAllocator()); + + // to json string + rapidjson::StringBuffer strbuf; + rapidjson::PrettyWriter writer(strbuf); + root.Accept(writer); + *json_result = std::string(strbuf.GetString()); + + return Status::OK(); +} + +void DeleteBitmapAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data()); + if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_INFO) { + std::string json_result; + Status st = _handle_show_delete_bitmap_count(req, &json_result); + if (!st.ok()) { + HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); + } else { + HttpChannel::send_reply(req, HttpStatus::OK, json_result); + } + } +} + +} // namespace doris diff --git a/be/src/http/action/delete_bitmap_action.h b/be/src/http/action/delete_bitmap_action.h new file mode 100644 index 00000000000000..52d1c3e35e0b38 --- /dev/null +++ b/be/src/http/action/delete_bitmap_action.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include + +#include "common/status.h" +#include "http/http_handler_with_auth.h" +#include "olap/tablet.h" + +namespace doris { +class HttpRequest; + +class ExecEnv; + +enum class DeleteBitmapActionType { COUNT_INFO = 1 }; + +/// This action is used for viewing the delete bitmap status +class DeleteBitmapAction : public HttpHandlerWithAuth { +public: + DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type ptype); + + ~DeleteBitmapAction() override = default; + + void handle(HttpRequest* req) override; + +private: + Status _handle_show_delete_bitmap_count(HttpRequest* req, std::string* json_result); + +private: + DeleteBitmapActionType _delete_bitmap_action_type; +}; +} // namespace doris diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 782063331dfb2b..4b6594f584699d 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1064,6 +1064,13 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { _tablet->delete_expired_stale_rowset(); } + if (config::enable_delete_bitmap_merge_on_compaction && + compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && + _tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) { + process_old_version_delete_bitmap(); + } + int64_t cur_max_version = 0; { std::shared_lock rlock(_tablet->get_header_lock()); @@ -1186,6 +1193,63 @@ void Compaction::_load_segment_to_cache() { } } +void Compaction::agg_and_remove_old_version_delete_bitmap( + std::vector& pre_rowsets, + std::vector>& + to_remove_vec, + DeleteBitmapPtr& new_delete_bitmap) { + // agg previously rowset old version delete bitmap + auto pre_max_version = _output_rowset->version().second; + new_delete_bitmap = std::make_shared(_tablet->tablet_meta()->tablet_id()); + for (auto& rowset : pre_rowsets) { + if (rowset->rowset_meta()->total_disk_size() == 0) { + continue; + } + for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { + rowset->rowset_id().to_string(); + DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0}; + DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version}; + auto d = _tablet->tablet_meta()->delete_bitmap().get_agg( + {rowset->rowset_id(), seg_id, pre_max_version}); + to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end)); + if (d->isEmpty()) { + continue; + } + new_delete_bitmap->set(end, *d); + } + } +} + +void Compaction::process_old_version_delete_bitmap() { + std::vector pre_rowsets {}; + for (const auto& it : _tablet->rowset_map()) { + if (it.first.second < _input_rowsets.front()->start_version()) { + pre_rowsets.emplace_back(it.second); + } + } + std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator); + if (!pre_rowsets.empty()) { + std::vector> + to_remove_vec; + DeleteBitmapPtr new_delete_bitmap = nullptr; + agg_and_remove_old_version_delete_bitmap(pre_rowsets, to_remove_vec, new_delete_bitmap); + if (!new_delete_bitmap->empty()) { + // store agg delete bitmap + Version version(_input_rowsets.front()->start_version(), + _input_rowsets.back()->end_version()); + for (auto it = new_delete_bitmap->delete_bitmap.begin(); + it != new_delete_bitmap->delete_bitmap.end(); it++) { + _tablet->tablet_meta()->delete_bitmap().set(it->first, it->second); + } + _tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(), + to_remove_vec); + DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", { + static_cast(_tablet.get())->delete_expired_stale_rowset(); + }); + } + } +} + #ifdef BE_TEST void Compaction::set_input_rowset(const std::vector& rowsets) { _input_rowsets = rowsets; diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 5aa3e260194319..52c9b817e258fa 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -102,6 +102,14 @@ class Compaction { return _allow_delete_in_cumu_compaction; } + void agg_and_remove_old_version_delete_bitmap( + std::vector& pre_rowsets, + std::vector>& + to_remove_vec, + DeleteBitmapPtr& new_delete_bitmap); + + void process_old_version_delete_bitmap(); + private: bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& commit_rowset_ids_set) const; void _load_segment_to_cache(); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 65c17eaee7e84e..8004d8a9d313ff 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -192,7 +192,8 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << _context.tablet->tablet_id() << ", rowset_ids: " << _context.mow_context->rowset_ids.size() << ", cur max_version: " << _context.mow_context->max_version - << ", transaction_id: " << _context.mow_context->txn_id + << ", transaction_id: " << _context.mow_context->txn_id << ", delete_bitmap_count: " + << _context.tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count() << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; return Status::OK(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 9f07f17ab8adf4..c6b12856ca0929 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -886,10 +886,13 @@ void Tablet::delete_expired_stale_rowset() { auto old_meta_size = _tablet_meta->all_stale_rs_metas().size(); // do delete operation + std::vector version_to_delete; auto to_delete_iter = stale_version_path_map.begin(); while (to_delete_iter != stale_version_path_map.end()) { std::vector& to_delete_version = to_delete_iter->second->timestamped_versions(); + int64_t start_version = -1; + int64_t end_version = -1; for (auto& timestampedVersion : to_delete_version) { auto it = _stale_rs_version_map.find(timestampedVersion->version()); if (it != _stale_rs_version_map.end()) { @@ -908,10 +911,17 @@ void Tablet::delete_expired_stale_rowset() { << timestampedVersion->version().second << "] not find in stale rs version map"; } + if (start_version < 0) { + start_version = timestampedVersion->version().first; + } + end_version = timestampedVersion->version().second; _delete_stale_rowset_by_version(timestampedVersion->version()); } + Version version(start_version, end_version); + version_to_delete.emplace_back(version.to_string()); to_delete_iter++; } + _tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete); bool reconstructed = _reconstruct_version_tracker_if_necessary(); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 83a92faf903fbc..058ad30a7db880 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1048,9 +1048,12 @@ bool DeleteBitmap::empty() const { } uint64_t DeleteBitmap::cardinality() const { + std::shared_lock l(lock); uint64_t res = 0; for (auto entry : delete_bitmap) { - res += entry.second.cardinality(); + if (std::get<1>(entry.first) != DeleteBitmap::INVALID_SEGMENT_ID) { + res += entry.second.cardinality(); + } } return res; } @@ -1122,6 +1125,55 @@ void DeleteBitmap::merge(const DeleteBitmap& other) { } } +size_t DeleteBitmap::get_size() const { + std::shared_lock l(lock); + size_t charge = 0; + for (auto& [k, v] : delete_bitmap) { + if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) { + charge += v.getSizeInBytes(); + } + } + return charge; +} + +void DeleteBitmap::add_to_remove_queue( + const std::string& version_str, + const std::vector>& + vector) { + std::shared_lock l(stale_delete_bitmap_lock); + _stale_delete_bitmap.emplace(version_str, vector); +} + +void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector& vector) { + if (!config::enable_delete_bitmap_merge_on_compaction) { + return; + } + std::shared_lock l(stale_delete_bitmap_lock); + for (auto& version_str : vector) { + auto it = _stale_delete_bitmap.find(version_str); + if (it != _stale_delete_bitmap.end()) { + for (auto& delete_bitmap_tuple : it->second) { + // the key range of to be removed is [start_bmk,end_bmk) + auto start_bmk = std::get<1>(delete_bitmap_tuple); + auto end_bmk = std::get<2>(delete_bitmap_tuple); + remove(start_bmk, end_bmk); + } + _stale_delete_bitmap.erase(version_str); + } + } +} + +uint64_t DeleteBitmap::get_delete_bitmap_count() { + std::shared_lock l(lock); + uint64_t count = 0; + for (auto it = delete_bitmap.begin(); it != delete_bitmap.end(); it++) { + if (std::get<1>(it->first) != DeleteBitmap::INVALID_SEGMENT_ID) { + count++; + } + } + return count; +} + // We cannot just copy the underlying memory to construct a string // due to equivalent objects may have different padding bytes. // Reading padding bytes is undefined behavior, neither copy nor diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 77d5554aae2eba..d241e580e34af3 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -352,6 +352,7 @@ class TabletMeta { class DeleteBitmap { public: mutable std::shared_mutex lock; + mutable std::shared_mutex stale_delete_bitmap_lock; using SegmentId = uint32_t; using Version = uint64_t; using BitmapKey = std::tuple; @@ -431,6 +432,12 @@ class DeleteBitmap { */ uint64_t cardinality() const; + /** + * return the total size of the Delete Bitmap(after serialized) + */ + + size_t get_size() const; + /** * Sets the bitmap of specific segment, it's may be insertion or replacement * @@ -499,6 +506,13 @@ class DeleteBitmap { */ std::shared_ptr get_agg(const BitmapKey& bmk) const; + void add_to_remove_queue(const std::string& version_str, + const std::vector>& vector); + void remove_stale_delete_bitmap_from_queue(const std::vector& vector); + + uint64_t get_delete_bitmap_count(); + class AggCachePolicy : public LRUCachePolicyTrackingManual { public: AggCachePolicy(size_t capacity) @@ -533,6 +547,10 @@ class DeleteBitmap { private: mutable std::shared_ptr _agg_cache; int64_t _tablet_id; + // > + std::map>> + _stale_delete_bitmap; }; static const std::string SEQUENCE_COL = "__DORIS_SEQUENCE_COL__"; diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index dc356b77731b86..6e558bc64479f0 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -37,6 +37,7 @@ #include "http/action/compaction_score_action.h" #include "http/action/config_action.h" #include "http/action/debug_point_action.h" +#include "http/action/delete_bitmap_action.h" #include "http/action/download_action.h" #include "http/action/download_binlog_action.h" #include "http/action/file_cache_action.h" @@ -288,6 +289,12 @@ Status HttpService::start() { _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status", run_status_compaction_action); + DeleteBitmapAction* count_delete_bitmap_action = + _pool.add(new DeleteBitmapAction(DeleteBitmapActionType::COUNT_INFO, _env, + TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_local", + count_delete_bitmap_action); + ConfigAction* update_config_action = _pool.add(new ConfigAction(ConfigActionType::UPDATE_CONFIG)); _ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action); diff --git a/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out b/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out new file mode 100644 index 00000000000000..3c26a4852ee4d7 --- /dev/null +++ b/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out @@ -0,0 +1,44 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 8 +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 + +-- !sql -- +0 0 8 +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 + +-- !sql -- +0 0 13 +1 13 13 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 + +-- !sql -- +0 0 13 +1 13 13 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 diff --git a/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy b/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy new file mode 100644 index 00000000000000..fb9911e2822a5f --- /dev/null +++ b/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id -> + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { be_host, be_http_port, tablet_id -> + boolean running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def getLocalDeleteBitmapStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_local?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + def testTable = "test_cu_compaction_remove_old_version_delete_bitmap" + def timeout = 10000 + sql """ DROP TABLE IF EXISTS ${testTable}""" + def testTableDDL = """ + create table ${testTable} + ( + `plan_id` bigint(20) NOT NULL, + `target_id` int(20) NOT NULL, + `target_name` varchar(255) NOT NULL + ) + ENGINE=OLAP + UNIQUE KEY(`plan_id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`plan_id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + sql testTableDDL + sql "sync" + + // store the original value + get_be_param("compaction_promotion_version_count") + get_be_param("tablet_rowset_stale_sweep_time_sec") + set_be_param("compaction_promotion_version_count", "5") + set_be_param("tablet_rowset_stale_sweep_time_sec", "0") + + try { + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets") + // 1. test normal + sql "sync" + sql """ INSERT INTO ${testTable} VALUES (0,0,'1'),(1,1,'1'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'2'),(2,2,'2'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'3'),(3,3,'3'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'4'),(4,4,'4'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'5'),(5,5,'5'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'6'),(6,6,'6'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'7'),(7,7,'7'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'8'),(8,8,'8'); """ + + qt_sql "select * from ${testTable} order by plan_id" + + // trigger compaction to generate base rowset + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + def local_delete_bitmap_count = 0 + def local_delete_bitmap_cardinality = 0; + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + String trigger_backend_id = tablet.BackendId + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + + // before compaction, delete_bitmap_count is (rowsets num - 1) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_count == 7) + assertTrue(local_delete_bitmap_cardinality == 7) + + assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], + "cumulative", tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + } + + qt_sql "select * from ${testTable} order by plan_id" + + def now = System.currentTimeMillis() + + sql """ INSERT INTO ${testTable} VALUES (0,0,'9'),(1,9,'9'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'10'),(1,10,'10'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'11'),(1,11,'11'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'12'),(1,12,'12'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'13'),(1,13,'13'); """ + + def time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff <= timeout, "wait_for_insert_into_values timeout") + + qt_sql "select * from ${testTable} order by plan_id" + + // trigger cu compaction to remove old version delete bitmap + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + + // before compaction, local delete_bitmap_count is (total rowsets num - 1), ms delete_bitmap_count is new rowset num + String trigger_backend_id = tablet.BackendId + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_count == 12) + assertTrue(local_delete_bitmap_cardinality == 17) + + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], + "cumulative", tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + + Thread.sleep(1000) + // after compaction, delete_bitmap_count is 1, cardinality is 2, check it + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_count == 1) + assertTrue(local_delete_bitmap_cardinality == 2) + } + + qt_sql "select * from ${testTable} order by plan_id" + } finally { + reset_be_param("compaction_promotion_version_count") + reset_be_param("tablet_rowset_stale_sweep_time_sec") + GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets") + } + +} \ No newline at end of file