Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,8 @@ DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");
DEFINE_mInt32(snappy_compression_block_size, "262144");
DEFINE_mInt32(lz4_compression_block_size, "262144");

DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false");

DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");

// clang-format off
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,8 @@ DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);
DECLARE_mInt32(snappy_compression_block_size);
DECLARE_mInt32(lz4_compression_block_size);

DECLARE_mBool(enable_delete_bitmap_merge_on_compaction);

DECLARE_mBool(enable_pipeline_task_leakage_detect);

#ifdef BE_TEST
Expand Down
121 changes: 121 additions & 0 deletions be/src/http/action/delete_bitmap_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "delete_bitmap_action.h"

#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>

#include <chrono> // IWYU pragma: keep
#include <exception>
#include <future>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <utility>

#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"

namespace doris {
using namespace ErrorCode;

namespace {

constexpr std::string_view HEADER_JSON = "application/json";

} // namespace

DeleteBitmapAction::DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
TPrivilegeHier::type hier, TPrivilegeType::type ptype)
: HttpHandlerWithAuth(exec_env, hier, ptype), _delete_bitmap_action_type(ctype) {}

static Status _check_param(HttpRequest* req, uint64_t* tablet_id) {
const auto& req_tablet_id = req->param(TABLET_ID_KEY);
if (req_tablet_id.empty()) {
return Status::InternalError("tablet id is empty!");
}
try {
*tablet_id = std::stoull(req_tablet_id);
} catch (const std::exception& e) {
return Status::InternalError("convert tablet_id failed, {}", e.what());
}
return Status::OK();
}

Status DeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
if (tablet_id == 0) {
return Status::InternalError("check param failed: missing tablet_id");
}

TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}

auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
auto size = tablet->tablet_meta()->delete_bitmap().get_size();

rapidjson::Document root;
root.SetObject();
root.AddMember("delete_bitmap_count", count, root.GetAllocator());
root.AddMember("cardinality", cardinality, root.GetAllocator());
root.AddMember("size", size, root.GetAllocator());

// to json string
rapidjson::StringBuffer strbuf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
*json_result = std::string(strbuf.GetString());

return Status::OK();
}

void DeleteBitmapAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_INFO) {
std::string json_result;
Status st = _handle_show_delete_bitmap_count(req, &json_result);
if (!st.ok()) {
HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
} else {
HttpChannel::send_reply(req, HttpStatus::OK, json_result);
}
}
}

} // namespace doris
51 changes: 51 additions & 0 deletions be/src/http/action/delete_bitmap_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <stdint.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: inclusion of deprecated C++ header 'stdint.h'; consider using 'cstdint' instead [modernize-deprecated-headers]

Suggested change
#include <stdint.h>
#include <cstdint>


#include <string>

#include "common/status.h"
#include "http/http_handler_with_auth.h"
#include "olap/tablet.h"

namespace doris {
class HttpRequest;

class ExecEnv;

enum class DeleteBitmapActionType { COUNT_INFO = 1 };

/// This action is used for viewing the delete bitmap status
class DeleteBitmapAction : public HttpHandlerWithAuth {
public:
DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type ptype);

~DeleteBitmapAction() override = default;

void handle(HttpRequest* req) override;

private:
Status _handle_show_delete_bitmap_count(HttpRequest* req, std::string* json_result);

private:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers]

Suggested change
private:
Additional context

be/src/http/action/delete_bitmap_action.h:44: previously declared here

private:
^

DeleteBitmapActionType _delete_bitmap_action_type;
};
} // namespace doris
64 changes: 64 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,13 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
_tablet->delete_expired_stale_rowset();
}

if (config::enable_delete_bitmap_merge_on_compaction &&
compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) {
process_old_version_delete_bitmap();
}

int64_t cur_max_version = 0;
{
std::shared_lock rlock(_tablet->get_header_lock());
Expand Down Expand Up @@ -1186,6 +1193,63 @@ void Compaction::_load_segment_to_cache() {
}
}

void Compaction::agg_and_remove_old_version_delete_bitmap(
std::vector<RowsetSharedPtr>& pre_rowsets,
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>&
to_remove_vec,
DeleteBitmapPtr& new_delete_bitmap) {
// agg previously rowset old version delete bitmap
auto pre_max_version = _output_rowset->version().second;
new_delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
}

void Compaction::process_old_version_delete_bitmap() {
std::vector<RowsetSharedPtr> pre_rowsets {};
for (const auto& it : _tablet->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
DeleteBitmapPtr new_delete_bitmap = nullptr;
agg_and_remove_old_version_delete_bitmap(pre_rowsets, to_remove_vec, new_delete_bitmap);
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
Version version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second);
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<Tablet*>(_tablet.get())->delete_expired_stale_rowset();
});
}
}
}

#ifdef BE_TEST
void Compaction::set_input_rowset(const std::vector<RowsetSharedPtr>& rowsets) {
_input_rowsets = rowsets;
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ class Compaction {
return _allow_delete_in_cumu_compaction;
}

void agg_and_remove_old_version_delete_bitmap(
std::vector<RowsetSharedPtr>& pre_rowsets,
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>&
to_remove_vec,
DeleteBitmapPtr& new_delete_bitmap);

void process_old_version_delete_bitmap();

private:
bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& commit_rowset_ids_set) const;
void _load_segment_to_cache();
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << _context.tablet->tablet_id()
<< ", rowset_ids: " << _context.mow_context->rowset_ids.size()
<< ", cur max_version: " << _context.mow_context->max_version
<< ", transaction_id: " << _context.mow_context->txn_id
<< ", transaction_id: " << _context.mow_context->txn_id << ", delete_bitmap_count: "
<< _context.tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count()
<< ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;
return Status::OK();
}
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -886,10 +886,13 @@ void Tablet::delete_expired_stale_rowset() {
auto old_meta_size = _tablet_meta->all_stale_rs_metas().size();

// do delete operation
std::vector<std::string> version_to_delete;
auto to_delete_iter = stale_version_path_map.begin();
while (to_delete_iter != stale_version_path_map.end()) {
std::vector<TimestampedVersionSharedPtr>& to_delete_version =
to_delete_iter->second->timestamped_versions();
int64_t start_version = -1;
int64_t end_version = -1;
for (auto& timestampedVersion : to_delete_version) {
auto it = _stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
Expand All @@ -908,10 +911,17 @@ void Tablet::delete_expired_stale_rowset() {
<< timestampedVersion->version().second
<< "] not find in stale rs version map";
}
if (start_version < 0) {
start_version = timestampedVersion->version().first;
}
end_version = timestampedVersion->version().second;
_delete_stale_rowset_by_version(timestampedVersion->version());
}
Version version(start_version, end_version);
version_to_delete.emplace_back(version.to_string());
to_delete_iter++;
}
_tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete);

bool reconstructed = _reconstruct_version_tracker_if_necessary();

Expand Down
54 changes: 53 additions & 1 deletion be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,9 +1048,12 @@ bool DeleteBitmap::empty() const {
}

uint64_t DeleteBitmap::cardinality() const {
std::shared_lock l(lock);
uint64_t res = 0;
for (auto entry : delete_bitmap) {
res += entry.second.cardinality();
if (std::get<1>(entry.first) != DeleteBitmap::INVALID_SEGMENT_ID) {
res += entry.second.cardinality();
}
}
return res;
}
Expand Down Expand Up @@ -1122,6 +1125,55 @@ void DeleteBitmap::merge(const DeleteBitmap& other) {
}
}

size_t DeleteBitmap::get_size() const {
std::shared_lock l(lock);
size_t charge = 0;
for (auto& [k, v] : delete_bitmap) {
if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
charge += v.getSizeInBytes();
}
}
return charge;
}

void DeleteBitmap::add_to_remove_queue(
const std::string& version_str,
const std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>&
vector) {
std::shared_lock l(stale_delete_bitmap_lock);
_stale_delete_bitmap.emplace(version_str, vector);
}

void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector<std::string>& vector) {
if (!config::enable_delete_bitmap_merge_on_compaction) {
return;
}
std::shared_lock l(stale_delete_bitmap_lock);
for (auto& version_str : vector) {
auto it = _stale_delete_bitmap.find(version_str);
if (it != _stale_delete_bitmap.end()) {
for (auto& delete_bitmap_tuple : it->second) {
// the key range of to be removed is [start_bmk,end_bmk)
auto start_bmk = std::get<1>(delete_bitmap_tuple);
auto end_bmk = std::get<2>(delete_bitmap_tuple);
remove(start_bmk, end_bmk);
}
_stale_delete_bitmap.erase(version_str);
}
}
}

uint64_t DeleteBitmap::get_delete_bitmap_count() {
std::shared_lock l(lock);
uint64_t count = 0;
for (auto it = delete_bitmap.begin(); it != delete_bitmap.end(); it++) {
if (std::get<1>(it->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
count++;
}
}
return count;
}

// We cannot just copy the underlying memory to construct a string
// due to equivalent objects may have different padding bytes.
// Reading padding bytes is undefined behavior, neither copy nor
Expand Down
Loading
Loading