Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ Status CloudBaseCompaction::prepare_compact() {

RETURN_IF_ERROR(pick_rowsets_to_compact());

for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size);
return Status::OK();
}

Status CloudBaseCompaction::request_global_lock() {
// prepare compaction job
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
Expand Down Expand Up @@ -113,25 +133,7 @@ Status CloudBaseCompaction::prepare_compact() {
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}

for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size);
return st;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class CloudBaseCompaction : public CloudCompactionMixin {

Status prepare_compact() override;
Status execute_compact() override;
Status request_global_lock();

void do_lease();

Expand Down
73 changes: 30 additions & 43 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ Status CloudCumulativeCompaction::prepare_compact() {
}
}

int tried = 0;
PREPARE_TRY_AGAIN:

bool need_sync_tablet = true;
{
std::shared_lock rlock(_tablet->get_header_lock());
Expand All @@ -83,7 +80,7 @@ Status CloudCumulativeCompaction::prepare_compact() {
// pick rowsets to compact
auto st = pick_rowsets_to_compact();
if (!st.ok()) {
if (tried == 0 && _last_delete_version.first != -1) {
if (_last_delete_version.first != -1) {
// we meet a delete version, should increase the cumulative point to let base compaction handle the delete version.
// plus 1 to skip the delete version.
// NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
Expand All @@ -96,6 +93,30 @@ Status CloudCumulativeCompaction::prepare_compact() {
return st;
}

for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("tablet_max_version", cloud_tablet()->max_version_unlocked())
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
.tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0))
.tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
return st;
}

Status CloudCumulativeCompaction::request_global_lock() {
// prepare compaction job
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
Expand All @@ -121,7 +142,7 @@ Status CloudCumulativeCompaction::prepare_compact() {
// Set input version range to let meta-service check version range conflict
compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction);
cloud::StartTabletJobResponse resp;
st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
Status st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (!st.ok()) {
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
// set last_sync_time to 0 to force sync tablet next time
Expand All @@ -130,22 +151,10 @@ Status CloudCumulativeCompaction::prepare_compact() {
// tablet not found
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_TABLET_BUSY) {
if (config::enable_parallel_cumu_compaction && resp.version_in_compaction_size() > 0 &&
++tried <= 2) {
_max_conflict_version = *std::max_element(resp.version_in_compaction().begin(),
resp.version_in_compaction().end());
LOG_INFO("retry pick input rowsets")
.tag("job_id", _uuid)
.tag("max_conflict_version", _max_conflict_version)
.tag("tried", tried)
.tag("msg", resp.status().msg());
goto PREPARE_TRY_AGAIN;
} else {
LOG_WARNING("failed to prepare cumu compaction")
.tag("job_id", _uuid)
.tag("msg", resp.status().msg());
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
}
LOG_WARNING("failed to prepare cumu compaction")
.tag("job_id", _uuid)
.tag("msg", resp.status().msg());
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
std::stringstream ss;
Expand All @@ -159,29 +168,7 @@ Status CloudCumulativeCompaction::prepare_compact() {
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}

for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("tablet_max_version", cloud_tablet()->max_version_unlocked())
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
.tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0))
.tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
return st;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {

Status prepare_compact() override;
Status execute_compact() override;
Status request_global_lock();

void do_lease();

Expand Down
37 changes: 19 additions & 18 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,25 @@ Status CloudFullCompaction::prepare_compact() {

RETURN_IF_ERROR(pick_rowsets_to_compact());

for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudFullCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size);
return Status::OK();
}
Status CloudFullCompaction::request_global_lock() {
// prepare compaction job
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
Expand Down Expand Up @@ -88,25 +107,7 @@ Status CloudFullCompaction::prepare_compact() {
// tablet not found
cloud_tablet()->clear_cache();
}
return st;
}

for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudFullCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size);
return st;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_full_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class CloudFullCompaction : public CloudCompactionMixin {

Status prepare_compact() override;
Status execute_compact() override;
Status request_global_lock();

void do_lease();

Expand Down
Loading
Loading