Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 149 additions & 127 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ static bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", "lookup_not_found
static bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second(
"doris_pk", "lookup_not_found_per_second", &g_tablet_pk_not_found, 60);

const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD = 10s;
const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD = 1s;

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count, MetricUnit::OPERATIONS);
Expand Down Expand Up @@ -678,153 +678,161 @@ void Tablet::_delete_stale_rowset_by_version(const Version& version) {

void Tablet::delete_expired_stale_rowset() {
int64_t now = UnixSeconds();
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
// Compute the end time to delete rowsets, when a expired rowset createtime less then this time, it will be deleted.
double expired_stale_sweep_endtime =
::difftime(now, config::tablet_rowset_stale_sweep_time_sec);
if (config::tablet_rowset_stale_sweep_by_size) {
expired_stale_sweep_endtime = now;
}

std::vector<int64_t> path_id_vec;
// capture the path version to delete
_timestamped_version_tracker.capture_expired_paths(
static_cast<int64_t>(expired_stale_sweep_endtime), &path_id_vec);

if (path_id_vec.empty()) {
return;
}

const RowsetSharedPtr lastest_delta = rowset_with_max_version();
if (lastest_delta == nullptr) {
LOG(WARNING) << "lastest_delta is null " << tablet_id();
return;
}

// fetch missing version before delete
std::vector<Version> missed_versions;
calc_missed_versions_unlocked(lastest_delta->end_version(), &missed_versions);
// hold write lock while processing stable rowset
{
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
// Compute the end time to delete rowsets, when a expired rowset createtime less then this time, it will be deleted.
double expired_stale_sweep_endtime =
::difftime(now, config::tablet_rowset_stale_sweep_time_sec);
if (config::tablet_rowset_stale_sweep_by_size) {
expired_stale_sweep_endtime = now;
}

if (!missed_versions.empty()) {
LOG(WARNING) << "tablet:" << full_name()
<< ", missed version for version:" << lastest_delta->end_version();
_print_missed_versions(missed_versions);
return;
}
std::vector<int64_t> path_id_vec;
// capture the path version to delete
_timestamped_version_tracker.capture_expired_paths(
static_cast<int64_t>(expired_stale_sweep_endtime), &path_id_vec);

// do check consistent operation
auto path_id_iter = path_id_vec.begin();
if (path_id_vec.empty()) {
return;
}

std::map<int64_t, PathVersionListSharedPtr> stale_version_path_map;
while (path_id_iter != path_id_vec.end()) {
PathVersionListSharedPtr version_path =
_timestamped_version_tracker.fetch_and_delete_path_by_id(*path_id_iter);
const RowsetSharedPtr lastest_delta = rowset_with_max_version();
if (lastest_delta == nullptr) {
LOG(WARNING) << "lastest_delta is null " << tablet_id();
return;
}

Version test_version = Version(0, lastest_delta->end_version());
stale_version_path_map[*path_id_iter] = version_path;
// fetch missing version before delete
std::vector<Version> missed_versions;
calc_missed_versions_unlocked(lastest_delta->end_version(), &missed_versions);

Status status = capture_consistent_versions(test_version, nullptr);
// 1. When there is no consistent versions, we must reconstruct the tracker.
if (!status.ok()) {
// 2. fetch missing version after delete
std::vector<Version> after_missed_versions;
calc_missed_versions_unlocked(lastest_delta->end_version(), &after_missed_versions);
if (!missed_versions.empty()) {
LOG(WARNING) << "tablet:" << full_name()
<< ", missed version for version:" << lastest_delta->end_version();
_print_missed_versions(missed_versions);
return;
}

// 2.1 check whether missed_versions and after_missed_versions are the same.
// when they are the same, it means we can delete the path securely.
bool is_missing = missed_versions.size() != after_missed_versions.size();
// do check consistent operation
auto path_id_iter = path_id_vec.begin();

if (!is_missing) {
for (int ver_index = 0; ver_index < missed_versions.size(); ver_index++) {
if (missed_versions[ver_index] != after_missed_versions[ver_index]) {
is_missing = true;
break;
}
}
}
std::map<int64_t, PathVersionListSharedPtr> stale_version_path_map;
while (path_id_iter != path_id_vec.end()) {
PathVersionListSharedPtr version_path =
_timestamped_version_tracker.fetch_and_delete_path_by_id(*path_id_iter);

if (is_missing) {
LOG(WARNING) << "The consistent version check fails, there are bugs. "
<< "Reconstruct the tracker to recover versions in tablet="
<< tablet_id();
Version test_version = Version(0, lastest_delta->end_version());
stale_version_path_map[*path_id_iter] = version_path;

// 3. try to recover
_timestamped_version_tracker.recover_versioned_tracker(stale_version_path_map);
Status status = capture_consistent_versions(test_version, nullptr);
// 1. When there is no consistent versions, we must reconstruct the tracker.
if (!status.ok()) {
// 2. fetch missing version after delete
std::vector<Version> after_missed_versions;
calc_missed_versions_unlocked(lastest_delta->end_version(), &after_missed_versions);

// 4. double check the consistent versions
// fetch missing version after recover
std::vector<Version> recover_missed_versions;
calc_missed_versions_unlocked(lastest_delta->end_version(),
&recover_missed_versions);
// 2.1 check whether missed_versions and after_missed_versions are the same.
// when they are the same, it means we can delete the path securely.
bool is_missing = missed_versions.size() != after_missed_versions.size();

// 4.1 check whether missed_versions and recover_missed_versions are the same.
// when they are the same, it means we recover successfully.
bool is_recover_missing = missed_versions.size() != recover_missed_versions.size();

if (!is_recover_missing) {
if (!is_missing) {
for (int ver_index = 0; ver_index < missed_versions.size(); ver_index++) {
if (missed_versions[ver_index] != recover_missed_versions[ver_index]) {
is_recover_missing = true;
if (missed_versions[ver_index] != after_missed_versions[ver_index]) {
is_missing = true;
break;
}
}
}

// 5. check recover fail, version is mission
if (is_recover_missing) {
if (!config::ignore_rowset_stale_unconsistent_delete) {
LOG(FATAL) << "rowset stale unconsistent delete. tablet= " << tablet_id();
} else {
LOG(WARNING) << "rowset stale unconsistent delete. tablet= " << tablet_id();
if (is_missing) {
LOG(WARNING) << "The consistent version check fails, there are bugs. "
<< "Reconstruct the tracker to recover versions in tablet="
<< tablet_id();

// 3. try to recover
_timestamped_version_tracker.recover_versioned_tracker(stale_version_path_map);

// 4. double check the consistent versions
// fetch missing version after recover
std::vector<Version> recover_missed_versions;
calc_missed_versions_unlocked(lastest_delta->end_version(),
&recover_missed_versions);

// 4.1 check whether missed_versions and recover_missed_versions are the same.
// when they are the same, it means we recover successfully.
bool is_recover_missing =
missed_versions.size() != recover_missed_versions.size();

if (!is_recover_missing) {
for (int ver_index = 0; ver_index < missed_versions.size(); ver_index++) {
if (missed_versions[ver_index] != recover_missed_versions[ver_index]) {
is_recover_missing = true;
break;
}
}
}

// 5. check recover fail, version is mission
if (is_recover_missing) {
if (!config::ignore_rowset_stale_unconsistent_delete) {
LOG(FATAL)
<< "rowset stale unconsistent delete. tablet= " << tablet_id();
} else {
LOG(WARNING)
<< "rowset stale unconsistent delete. tablet= " << tablet_id();
}
}
}
return;
}
return;
}
path_id_iter++;
}

auto old_size = _stale_rs_version_map.size();
auto old_meta_size = _tablet_meta->all_stale_rs_metas().size();

// do delete operation
auto to_delete_iter = stale_version_path_map.begin();
while (to_delete_iter != stale_version_path_map.end()) {
std::vector<TimestampedVersionSharedPtr>& to_delete_version =
to_delete_iter->second->timestamped_versions();
for (auto& timestampedVersion : to_delete_version) {
auto it = _stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
// delete rowset
StorageEngine::instance()->add_unused_rowset(it->second);
_stale_rs_version_map.erase(it);
VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << " version["
<< timestampedVersion->version().first << ","
<< timestampedVersion->version().second
<< "] move to unused_rowset success " << std::fixed
<< expired_stale_sweep_endtime;
} else {
LOG(WARNING) << "delete stale rowset tablet=" << full_name() << " version["
<< timestampedVersion->version().first << ","
<< timestampedVersion->version().second
<< "] not find in stale rs version map";
path_id_iter++;
}

auto old_size = _stale_rs_version_map.size();
auto old_meta_size = _tablet_meta->all_stale_rs_metas().size();

// do delete operation
auto to_delete_iter = stale_version_path_map.begin();
while (to_delete_iter != stale_version_path_map.end()) {
std::vector<TimestampedVersionSharedPtr>& to_delete_version =
to_delete_iter->second->timestamped_versions();
for (auto& timestampedVersion : to_delete_version) {
auto it = _stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
// delete rowset
StorageEngine::instance()->add_unused_rowset(it->second);
_stale_rs_version_map.erase(it);
VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << " version["
<< timestampedVersion->version().first << ","
<< timestampedVersion->version().second
<< "] move to unused_rowset success " << std::fixed
<< expired_stale_sweep_endtime;
} else {
LOG(WARNING) << "delete stale rowset tablet=" << full_name() << " version["
<< timestampedVersion->version().first << ","
<< timestampedVersion->version().second
<< "] not find in stale rs version map";
}
_delete_stale_rowset_by_version(timestampedVersion->version());
}
_delete_stale_rowset_by_version(timestampedVersion->version());
to_delete_iter++;
}
to_delete_iter++;
}

bool reconstructed = _reconstruct_version_tracker_if_necessary();

VLOG_NOTICE << "delete stale rowset _stale_rs_version_map tablet=" << full_name()
<< " current_size=" << _stale_rs_version_map.size() << " old_size=" << old_size
<< " current_meta_size=" << _tablet_meta->all_stale_rs_metas().size()
<< " old_meta_size=" << old_meta_size << " sweep endtime " << std::fixed
<< expired_stale_sweep_endtime << ", reconstructed=" << reconstructed;
bool reconstructed = _reconstruct_version_tracker_if_necessary();

VLOG_NOTICE << "delete stale rowset _stale_rs_version_map tablet=" << full_name()
<< " current_size=" << _stale_rs_version_map.size() << " old_size=" << old_size
<< " current_meta_size=" << _tablet_meta->all_stale_rs_metas().size()
<< " old_meta_size=" << old_meta_size << " sweep endtime " << std::fixed
<< expired_stale_sweep_endtime << ", reconstructed=" << reconstructed;
}
#ifndef BE_TEST
save_meta();
{
std::shared_lock<std::shared_mutex> rlock(_meta_lock);
save_meta();
}
#endif
}

Expand Down Expand Up @@ -3314,8 +3322,10 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
RowsetIdUnorderedSet rowset_ids_to_del;
int64_t cur_version = rowset->start_version();

OlapStopWatch watch;
std::vector<segment_v2::SegmentSharedPtr> segments;
static_cast<void>(_load_rowset_segments(rowset, &segments));
RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
auto t1 = watch.get_elapse_time_us();

{
std::shared_lock meta_rlock(_meta_lock);
Expand All @@ -3328,6 +3338,8 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
}
cur_rowset_ids = all_rs_id(cur_version - 1);
}
auto t2 = watch.get_elapse_time_us();

_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del);
for (const auto& to_del : rowset_ids_to_del) {
delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
Expand All @@ -3338,21 +3350,31 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
std::shared_lock meta_rlock(_meta_lock);
specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
}
auto t3 = watch.get_elapse_time_us();

OlapStopWatch watch;
auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
cur_version - 1, token.get(), rowset_writer));
RETURN_IF_ERROR(token->wait());
RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap));

std::stringstream ss;
if (watch.get_elapse_time_us() < 1 * 1000 * 1000) {
ss << "cost: " << watch.get_elapse_time_us() - t3 << "(us)";
} else {
ss << "cost(us): (load segments: " << t1 << ", get all rsid: " << t2 - t1
<< ", get rowsets: " << t3 - t2
<< ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 << ")";
}

size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id()
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
<< ", cur max_version: " << cur_version << ", transaction_id: " << txn_id
<< ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;
<< ", cur max_version: " << cur_version << ", transaction_id: " << txn_id << ","
<< ss.str() << " , total rows: " << total_rows;

if (config::enable_merge_on_write_correctness_check && rowset->num_rows() != 0) {
// only do correctness check if the rowset has at least one row written
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "olap/tablet_meta_manager.h"
#include "olap/utils.h"
#include "util/string_util.h"
#include "util/time.h"
#include "util/uid_util.h"

using std::string;
Expand Down Expand Up @@ -437,12 +438,22 @@ Status TabletMeta::_save_meta(DataDir* data_dir) {
<< " tablet=" << full_name() << " _tablet_uid=" << _tablet_uid.to_string();
}
string meta_binary;

auto t1 = MonotonicMicros();
RETURN_IF_ERROR(serialize(&meta_binary));
auto t2 = MonotonicMicros();
Status status = TabletMetaManager::save(data_dir, tablet_id(), schema_hash(), meta_binary);
if (!status.ok()) {
LOG(FATAL) << "fail to save tablet_meta. status=" << status << ", tablet_id=" << tablet_id()
<< ", schema_hash=" << schema_hash();
}
auto t3 = MonotonicMicros();
auto cost = t3 - t1;
if (cost > 1 * 1000 * 1000) {
LOG(INFO) << "save tablet(" << full_name() << ") meta too slow. serialize cost " << t2 - t1
<< "(us), serialized binary size: " << meta_binary.length()
<< "(bytes), write rocksdb cost " << t3 - t2 << "(us)";
}
return status;
}

Expand Down