Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size");

CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet)
: CloudCompactionMixin(engine, tablet,
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {
auto uuid = UUIDGenerator::instance()->next_uuid();
std::stringstream ss;
ss << uuid;
_uuid = ss.str();
}
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {}

CloudBaseCompaction::~CloudBaseCompaction() = default;

Expand Down Expand Up @@ -330,8 +325,7 @@ Status CloudBaseCompaction::modify_rowsets() {
DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
int64_t initiator = this->initiator();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
Expand Down Expand Up @@ -403,8 +397,8 @@ Status CloudBaseCompaction::modify_rowsets() {
return Status::OK();
}

void CloudBaseCompaction::garbage_collection() {
CloudCompactionMixin::garbage_collection();
Status CloudBaseCompaction::garbage_collection() {
RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection());
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
idx->set_tablet_id(_tablet->tablet_id());
Expand All @@ -418,9 +412,7 @@ void CloudBaseCompaction::garbage_collection() {
compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
compaction_job->set_delete_bitmap_lock_initiator(initiator);
compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
}
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
Expand All @@ -429,6 +421,7 @@ void CloudBaseCompaction::garbage_collection() {
.tag("tablet_id", _tablet->tablet_id())
.error(st);
}
return st;
}

void CloudBaseCompaction::do_lease() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@ class CloudBaseCompaction : public CloudCompactionMixin {

Status modify_rowsets() override;

void garbage_collection() override;
Status garbage_collection() override;

void _filter_input_rowset();

void build_basic_info();

ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; }

std::string _uuid;
int64_t _input_segments = 0;
int64_t _base_compaction_cnt = 0;
int64_t _cumulative_compaction_cnt = 0;
Expand Down
33 changes: 20 additions & 13 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,7 @@ bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size");
CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
CloudTabletSPtr tablet)
: CloudCompactionMixin(engine, tablet,
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {
auto uuid = UUIDGenerator::instance()->next_uuid();
std::stringstream ss;
ss << uuid;
_uuid = ss.str();
}
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {}

CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;

Expand Down Expand Up @@ -284,8 +279,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
});

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
int64_t initiator =
HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max();
int64_t initiator = this->initiator();
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
Expand All @@ -305,6 +299,13 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", {
LOG(INFO) << "CumulativeCompaction.modify_rowsets.trigger_abort_job_failed for tablet_id"
<< cloud_tablet()->tablet_id();
return Status::InternalError(
"CumulativeCompaction.modify_rowsets.trigger_abort_job_failed for tablet_id {}",
cloud_tablet()->tablet_id());
});
cloud::FinishTabletJobResponse resp;
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
if (resp.has_alter_version()) {
Expand Down Expand Up @@ -420,8 +421,8 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
return Status::OK();
}

void CloudCumulativeCompaction::garbage_collection() {
CloudCompactionMixin::garbage_collection();
Status CloudCumulativeCompaction::garbage_collection() {
RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection());
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
idx->set_tablet_id(_tablet->tablet_id());
Expand All @@ -435,17 +436,23 @@ void CloudCumulativeCompaction::garbage_collection() {
compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE);
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
compaction_job->set_delete_bitmap_lock_initiator(initiator);
compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
}
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", {
LOG(INFO) << "CumulativeCompaction.modify_rowsets.abort_job_failed for tablet_id"
<< cloud_tablet()->tablet_id();
return Status::InternalError(
"CumulativeCompaction.modify_rowsets.abort_job_failed for tablet_id {}",
cloud_tablet()->tablet_id());
});
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
LOG_WARNING("failed to abort compaction job")
.tag("job_id", _uuid)
.tag("tablet_id", _tablet->tablet_id())
.error(st);
}
return st;
}

Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {

Status modify_rowsets() override;

void garbage_collection() override;
Status garbage_collection() override;

void update_cumulative_point();

Status process_old_version_delete_bitmap();

ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; }

std::string _uuid;
int64_t _input_segments = 0;
int64_t _max_conflict_version = 0;
// Snapshot values when pick input rowsets
Expand Down
20 changes: 6 additions & 14 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ bvar::Adder<uint64_t> full_output_size("full_compaction", "output_size");

CloudFullCompaction::CloudFullCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet)
: CloudCompactionMixin(engine, tablet,
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {
auto uuid = UUIDGenerator::instance()->next_uuid();
std::stringstream ss;
ss << uuid;
_uuid = ss.str();
}
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {}

CloudFullCompaction::~CloudFullCompaction() = default;

Expand Down Expand Up @@ -227,10 +222,8 @@ Status CloudFullCompaction::modify_rowsets() {
DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator =
boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(initiator));
compaction_job->set_delete_bitmap_lock_initiator(initiator);
RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(this->initiator()));
compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
}

cloud::FinishTabletJobResponse resp;
Expand Down Expand Up @@ -271,7 +264,7 @@ Status CloudFullCompaction::modify_rowsets() {
return Status::OK();
}

void CloudFullCompaction::garbage_collection() {
Status CloudFullCompaction::garbage_collection() {
//file_cache_garbage_collection();
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
Expand All @@ -286,9 +279,7 @@ void CloudFullCompaction::garbage_collection() {
compaction_job->set_type(cloud::TabletCompactionJobPB::FULL);
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator =
boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max();
compaction_job->set_delete_bitmap_lock_initiator(initiator);
compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
}
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
Expand All @@ -297,6 +288,7 @@ void CloudFullCompaction::garbage_collection() {
.tag("tablet_id", _tablet->tablet_id())
.error(st);
}
return st;
}

void CloudFullCompaction::do_lease() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_full_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CloudFullCompaction : public CloudCompactionMixin {
std::string_view compaction_name() const override { return "CloudFullCompaction"; }

Status modify_rowsets() override;
void garbage_collection() override;
Status garbage_collection() override;

private:
Status _cloud_full_compaction_update_delete_bitmap(int64_t initiator);
Expand All @@ -52,7 +52,6 @@ class CloudFullCompaction : public CloudCompactionMixin {

ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; }

std::string _uuid;
int64_t _input_segments = 0;
// Snapshot values when pick input rowsets
int64_t _base_compaction_cnt = 0;
Expand Down
17 changes: 9 additions & 8 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1201,23 +1201,24 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in
return st;
}

Status CloudMetaMgr::remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator) {
VLOG_DEBUG << "remove_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id()
<< ",lock_id:" << lock_id;
void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id,
int64_t initiator, int64_t tablet_id) {
LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id
<< ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id;
RemoveDeleteBitmapUpdateLockRequest req;
RemoveDeleteBitmapUpdateLockResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet.tablet_id());
req.set_table_id(table_id);
req.set_tablet_id(tablet_id);
req.set_lock_id(lock_id);
req.set_initiator(initiator);
auto st = retry_rpc("remove delete bitmap update lock", req, &res,
&MetaService_Stub::remove_delete_bitmap_update_lock);
if (!st.ok()) {
LOG(WARNING) << "remove delete bitmap update lock fail,tablet_id=" << tablet.tablet_id()
<< " lock_id=" << lock_id << " st=" << st.to_string();
LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id
<< ",tablet_id=" << tablet_id << ",lock_id=" << lock_id
<< ",st=" << st.to_string();
}
return st;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return the status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove_delete_bitmap_update_lock just try to release lock, some times it may fail, like lock id not found beacause lock is expired and loading task has already taken the lock, in this situation, we can do nothing, just print a warnning lock is enought, so no need to return a wrong statu

}

Status CloudMetaMgr::remove_old_version_delete_bitmap(
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class CloudMetaMgr {
Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator);

Status remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator);
void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, int64_t initiator,
int64_t tablet_id);

Status remove_old_version_delete_bitmap(
int64_t tablet_id,
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
Expand All @@ -54,7 +55,6 @@ namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
static constexpr int LOAD_INITIATOR_ID = -1;

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ DEFINE_mInt32(sync_load_for_tablets_thread, "32");

DEFINE_mBool(enable_new_tablet_do_compaction, "false");

DEFINE_Int32(delete_bitmap_lock_expiration_seconds, "10");
DEFINE_mInt32(delete_bitmap_lock_expiration_seconds, "10");

DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ DECLARE_mBool(save_load_error_log_to_s3);
// the theads which sync the datas which loaded in other clusters
DECLARE_mInt32(sync_load_for_tablets_thread);

DECLARE_Int32(delete_bitmap_lock_expiration_seconds);
DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);

// enable large txn lazy commit in meta-service `commit_txn`
DECLARE_mBool(enable_cloud_txn_lazy_commit);
Expand Down
31 changes: 26 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,12 @@ int64_t CloudCompactionMixin::get_compaction_permits() {

CloudCompactionMixin::CloudCompactionMixin(CloudStorageEngine& engine, CloudTabletSPtr tablet,
const std::string& label)
: Compaction(tablet, label), _engine(engine) {}
: Compaction(tablet, label), _engine(engine) {
auto uuid = UUIDGenerator::instance()->next_uuid();
std::stringstream ss;
ss << uuid;
_uuid = ss.str();
}

Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
OlapStopWatch watch;
Expand Down Expand Up @@ -1439,11 +1444,26 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
return Status::OK();
}

int64_t CloudCompactionMixin::initiator() const {
return HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max();
}

Status CloudCompactionMixin::execute_compact() {
TEST_INJECTION_POINT("Compaction::do_compaction");
int64_t permits = get_compaction_permits();
HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(execute_compact_impl(permits),
[&](const doris::Exception& ex) { garbage_collection(); });
HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
execute_compact_impl(permits), [&](const doris::Exception& ex) {
auto st = garbage_collection();
if (!st.ok() && initiator() != INVALID_COMPACTION_INITIATOR_ID) {
// if compaction fail, be will try to abort compaction, and delete bitmap lock
// will release if abort job successfully, but if abort failed, delete bitmap
// lock will not release, in this situation, be need to send this rpc to ms
// to try to release delete bitmap lock.
_engine.meta_mgr().remove_delete_bitmap_update_lock(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"If the RPC to release the lock fails here, will the lock still not be released?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this pr just try to release lock, it may fail on TXN_KV_CONFICT, this problem has existed for a long time, if release rpc fail, this lock will be released when lock is expired.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luwei16 It's ok if the rpc failed to release lock here, it's the same with rpc failed on commit phase.

_tablet->table_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator(),
_tablet->tablet_id());
}
});
_load_segment_to_cache();
return Status::OK();
}
Expand Down Expand Up @@ -1488,9 +1508,9 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
return Status::OK();
}

void CloudCompactionMixin::garbage_collection() {
Status CloudCompactionMixin::garbage_collection() {
if (!config::enable_file_cache) {
return;
return Status::OK();
}
if (_output_rs_writer) {
auto* beta_rowset_writer = dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get());
Expand All @@ -1501,6 +1521,7 @@ void CloudCompactionMixin::garbage_collection() {
file_cache->remove_if_cached_async(file_key);
}
}
return Status::OK();
}

void CloudCompactionMixin::update_compaction_level() {
Expand Down
Loading
Loading