Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,9 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
// delete rowset in "to_delete" directly
for (auto& rs : to_delete) {
LOG(INFO) << "add unused rowset " << rs->rowset_id() << " because of same version";
StorageEngine::instance()->add_unused_rowset(rs);
if (rs->is_local()) {
StorageEngine::instance()->add_unused_rowset(rs);
}
}
}
return Status::OK();
Expand Down Expand Up @@ -604,7 +606,9 @@ void Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool
} else {
for (auto& rs : to_delete) {
_timestamped_version_tracker.delete_version(rs->version());
StorageEngine::instance()->add_unused_rowset(rs);
if (rs->is_local()) {
StorageEngine::instance()->add_unused_rowset(rs);
}
}
}
}
Expand Down Expand Up @@ -830,7 +834,9 @@ void Tablet::delete_expired_stale_rowset() {
auto it = _stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
// delete rowset
StorageEngine::instance()->add_unused_rowset(it->second);
if (it->second->is_local()) {
StorageEngine::instance()->add_unused_rowset(it->second);
}
_stale_rs_version_map.erase(it);
VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << " version["
<< timestampedVersion->version().first << ","
Expand Down Expand Up @@ -2311,6 +2317,12 @@ Status Tablet::_follow_cooldowned_data() {
std::vector<RowsetSharedPtr> overlap_rowsets;
bool version_aligned = false;

// Holding these to delete rowsets' shared ptr until save meta can avoid trash sweeping thread
// deleting these rowsets' files before rowset meta has been removed from disk, which may cause
// data loss when BE reboot before save meta to disk.
std::vector<RowsetSharedPtr> to_delete;
std::vector<RowsetSharedPtr> to_add;

{
std::lock_guard wlock(_meta_lock);
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
Expand All @@ -2336,17 +2348,18 @@ Status Tablet::_follow_cooldowned_data() {
}
}
std::sort(overlap_rowsets.begin(), overlap_rowsets.end(), Rowset::comparator);

// Find different rowset in `overlap_rowsets` and `cooldown_meta_pb.rs_metas`
auto rs_pb_it = cooldown_meta_pb.rs_metas().begin();
auto rs_it = overlap_rowsets.begin();
for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it != overlap_rowsets.end();
++rs_pb_it, ++rs_it) {
// skip cooldowned rowset with same version in BE
if ((*rs_it)->is_local() || rs_pb_it->end_version() != (*rs_it)->end_version()) {
if (rs_pb_it->rowset_id_v2() != (*rs_it)->rowset_id().to_string()) {
break;
}
}
std::vector<RowsetSharedPtr> to_delete(rs_it, overlap_rowsets.end());
std::vector<RowsetSharedPtr> to_add;

to_delete.assign(rs_it, overlap_rowsets.end());
to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it);
for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) {
auto rs_meta = std::make_shared<RowsetMeta>();
Expand All @@ -2361,12 +2374,29 @@ Status Tablet::_follow_cooldowned_data() {
// TODO(plat1ko): process primary key
_tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
}

{
std::lock_guard rlock(_meta_lock);
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
save_meta();
}

if (!to_add.empty()) {
LOG(INFO) << "modify rowsets when follow cooldowned data, tablet_id=" << tablet_id()
<< [&]() {
std::stringstream ss;
ss << " delete rowsets:\n";
for (auto&& rs : to_delete) {
ss << rs->version() << ' ' << rs->rowset_id() << '\n';
}
ss << "add rowsets:\n";
for (auto&& rs : to_add) {
ss << rs->version() << ' ' << rs->rowset_id() << '\n';
}
return ss.str();
}();
}

return Status::OK();
}

Expand Down