Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
_last_base_compaction_success_millis(0),
_cumulative_point(K_INVALID_CUMULATIVE_POINT),
_cumulative_compaction_type(cumulative_compaction_type) {
// change _rs_graph to _timestamped_version_tracker
_timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas());
// construct _timestamped_versioned_tracker from rs and stale rs meta
_timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(),
_tablet_meta->all_stale_rs_metas());
}

OLAPStatus Tablet::_init_once_action() {
Expand Down Expand Up @@ -108,6 +109,20 @@ OLAPStatus Tablet::_init_once_action() {
_inc_rs_version_map[version] = std::move(rowset);
}

// init stale rowset
for (auto& stale_rs_meta : _tablet_meta->all_stale_rs_metas()) {
Version version = stale_rs_meta->version();
RowsetSharedPtr rowset;
res = RowsetFactory::create_rowset(&_schema, _tablet_path, stale_rs_meta, &rowset);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init stale rowset. tablet_id:" << tablet_id()
<< ", schema_hash:" << schema_hash() << ", version=" << version
<< ", res:" << res;
return res;
}
_stale_rs_version_map[version] = std::move(rowset);
}

return res;
}

Expand Down Expand Up @@ -276,6 +291,15 @@ const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version) cons
return iter->second;
}

const RowsetSharedPtr Tablet::get_stale_rowset_by_version(const Version& version) const {
auto iter = _stale_rs_version_map.find(version);
if (iter == _stale_rs_version_map.end()) {
VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name();
return nullptr;
}
return iter->second;
}

// This function only be called by SnapshotManager to perform incremental clone.
// It will be called under protected of _meta_lock(SnapshotManager will fetch it manually),
// so it is no need to lock here.
Expand Down Expand Up @@ -895,6 +919,12 @@ bool Tablet::check_path(const std::string& path_to_check) const {
return true;
}
}
for (auto& stale_version_rowset : _stale_rs_version_map) {
bool ret = stale_version_rowset.second->check_path(path_to_check);
if (ret) {
return true;
}
}
return false;
}

Expand All @@ -919,6 +949,11 @@ bool Tablet::check_rowset_id(const RowsetId& rowset_id) {
return true;
}
}
for (auto& stale_version_rowset : _stale_rs_version_map) {
if (stale_version_rowset.second->rowset_id() == rowset_id) {
return true;
}
}
if (RowsetMetaManager::check_rowset_meta(_data_dir->get_meta(), tablet_uid(), rowset_id)) {
return true;
}
Expand Down Expand Up @@ -1120,6 +1155,14 @@ bool Tablet::rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta) {
find_version = true;
}
}
for (auto& stale_version_rowset : _stale_rs_version_map) {
if (stale_version_rowset.second->rowset_id() == rowset_meta->rowset_id()) {
find_rowset_id = true;
}
if (stale_version_rowset.second->contains_version(rowset_meta->version())) {
find_version = true;
}
}
return find_rowset_id || !find_version;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class Tablet : public BaseTablet {
// The caller must call hold _meta_lock when call this two function.
const RowsetSharedPtr get_rowset_by_version(const Version& version) const;
const RowsetSharedPtr get_inc_rowset_by_version(const Version& version) const;
const RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const;

const RowsetSharedPtr rowset_with_max_version() const;

Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
_inc_rs_metas.push_back(std::move(rs_meta));
}

for (auto& it : tablet_meta_pb.stale_rs_metas()) {
RowsetMetaSharedPtr rs_meta(new AlphaRowsetMeta());
rs_meta->init_from_pb(it);
_stale_rs_metas.push_back(std::move(rs_meta));
}

// generate AlterTabletTask
if (tablet_meta_pb.has_alter_task()) {
AlterTabletTask* alter_tablet_task = new AlterTabletTask();
Expand Down Expand Up @@ -431,6 +437,9 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
for (auto rs : _inc_rs_metas) {
rs->to_rowset_pb(tablet_meta_pb->add_inc_rs_metas());
}
for (auto rs : _stale_rs_metas) {
rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas());
}
_schema.to_schema_pb(tablet_meta_pb->mutable_schema());
if (_alter_task != nullptr) {
_alter_task->to_alter_pb(tablet_meta_pb->mutable_alter_task());
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ class TabletMeta {
const std::vector<RowsetMetaSharedPtr>& to_delete);
void revise_rs_metas(std::vector<RowsetMetaSharedPtr>&& rs_metas);


void revise_inc_rs_metas(std::vector<RowsetMetaSharedPtr>&& rs_metas);

inline const std::vector<RowsetMetaSharedPtr>& all_inc_rs_metas() const;
inline const std::vector<RowsetMetaSharedPtr>& all_stale_rs_metas() const;
OLAPStatus add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta);
Expand Down
163 changes: 162 additions & 1 deletion be/src/olap/version_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace doris {
void TimestampedVersionTracker::_construct_versioned_tracker(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
int64_t max_version = 0;

// construct the roset graph
// construct the rowset graph
_version_graph.reconstruct_version_graph(rs_metas, &max_version);
}

Expand All @@ -43,6 +43,167 @@ void TimestampedVersionTracker::construct_versioned_tracker(const std::vector<Ro
_construct_versioned_tracker(rs_metas);
}

void TimestampedVersionTracker::construct_versioned_tracker(
const std::vector<RowsetMetaSharedPtr>& rs_metas,
const std::vector<RowsetMetaSharedPtr>& stale_metas) {

if (rs_metas.empty()) {
VLOG(3) << "there is no version in the header.";
return;
}
_stale_version_path_map.clear();
_next_path_id = 1;
_construct_versioned_tracker(rs_metas);

// init _stale_version_path_map
_init_stale_version_path_map(rs_metas, stale_metas);
}

void TimestampedVersionTracker::_init_stale_version_path_map(
const std::vector<RowsetMetaSharedPtr>& rs_metas,
const std::vector<RowsetMetaSharedPtr>& stale_metas) {

if (stale_metas.empty()) {
return;
}

// sort stale meta by version diff (second version - first version)
std::list<RowsetMetaSharedPtr> sorted_stale_metas;
for (auto& rs : stale_metas) {
sorted_stale_metas.emplace_back(rs);
}

// 1. Sort the existing rowsets by version in ascending order
sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
// compare by version diff between version.first and version.second
int64_t a_diff = a->version().second - a->version().first;
int64_t b_diff = b->version().second - b->version().first;

int diff = a_diff - b_diff;
if (diff < 0) {
return true;
}
else if (diff > 0) {
return false;
}
// when the version diff is equal, compare rowset createtime
return a->creation_time() < b->creation_time();
});

// first_version -> (second_version -> rowset_meta)
std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>> stale_map;

// 2. generate stale path from stale_metas. traverse sorted_stale_metas and each time add stale_meta to stale_map.
// when a stale path in stale_map can replace stale_meta in sorted_stale_metas, stale_map remove rowset_metas of a stale path
// and add the path to _stale_version_path_map.
for(auto& stale_meta:sorted_stale_metas) {
std::vector<RowsetMetaSharedPtr> stale_path;
// 2.1 find a path in stale_map can replace current stale_meta version
bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(), stale_meta->end_version(), &stale_path);

// 2.2 add stale_meta to stale_map
auto start_iter = stale_map.find(stale_meta->start_version());
if (start_iter != stale_map.end()) {
start_iter->second[stale_meta->end_version()] = stale_meta;
} else {
std::unordered_map<int64_t, RowsetMetaSharedPtr> item;
item[stale_meta->end_version()] = stale_meta;
stale_map[stale_meta->start_version()] = std::move(item);
}
// 2.3 add version to version_graph
Version stale_meta_version = stale_meta->version();
add_version(stale_meta_version);
// 2.4 find the path
if (r) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get this logic...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get this logic...

// add the path to _stale_version_path_map
add_stale_path_version(stale_path);
// remove stale_path from stale_map
for (auto stale_item:stale_path) {
stale_map[stale_item->start_version()].erase(stale_item->end_version());

if (stale_map[stale_item->start_version()].empty()) {
stale_map.erase(stale_item->start_version());
}
}
}
}

// 3. generate stale path from rs_metas
for(auto& stale_meta:rs_metas) {
std::vector<RowsetMetaSharedPtr> stale_path;
// 3.1 find a path in stale_map can replace current stale_meta version
bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(), stale_meta->end_version(), &stale_path);

// 3.2 find the path
if (r) {
// add the path to _stale_version_path_map
add_stale_path_version(stale_path);
// remove stale_path from stale_map
for (auto stale_item:stale_path) {
stale_map[stale_item->start_version()].erase(stale_item->end_version());

if (stale_map[stale_item->start_version()].empty()) {
stale_map.erase(stale_item->start_version());
}
}
}
}

// 4. process remain stale rowset_meta in stale_map
auto map_iter = stale_map.begin();
while (map_iter != stale_map.end()) {
auto second_iter = map_iter->second.begin();
while(second_iter != map_iter->second.end()) {
// each remain stale rowset_meta generate a stale path
std::vector<RowsetMetaSharedPtr> stale_path;
stale_path.push_back(second_iter->second);
add_stale_path_version(stale_path);

second_iter++;
}
map_iter++;
}
}

bool TimestampedVersionTracker::_find_path_from_stale_map(
const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>& stale_map,
int64_t first_version, int64_t second_version, std::vector<RowsetMetaSharedPtr>* stale_path) {

auto first_iter = stale_map.find(first_version);
// if first_version not in stale_map, there is no path.
if (first_iter == stale_map.end()) {
return false;
}
auto& second_version_map = first_iter->second;
auto second_iter = second_version_map.find(second_version);
// if second_version in stale_map, find a path.
if (second_iter != second_version_map.end()) {
auto row_meta = second_iter->second;
// add rowset to path
stale_path->push_back(row_meta);
return true;
}

// traverse the first version map to backtracking _find_path_from_stale_map
auto map_iter = second_version_map.begin();
while (map_iter != second_version_map.end()) {
// the version greater than second_version, we can't find path in stale_map
if (map_iter->first > second_version) {
continue;
}
// backtracking _find_path_from_stale_map find from map_iter->first + 1 to second_version
stale_path->push_back(map_iter->second);
bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version, stale_path);
if (r) {
return true;
}
// there is no path in current version, pop and continue
stale_path->pop_back();
map_iter++;
}

return false;
}

void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) {

Expand Down
18 changes: 16 additions & 2 deletions be/src/olap/version_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ using PathVersionListSharedPtr = std::shared_ptr<TimestampedVersionPathContainer
/// after the path is expired.
class TimestampedVersionTracker {
public:
/// Construct rowsets version tracker by rs_metas and stale version path map.
/// Construct rowsets version tracker by main path rowset meta.
void construct_versioned_tracker(const std::vector<RowsetMetaSharedPtr>& rs_metas);

/// Construct rowsets version tracker by main path rowset meta and stale rowset meta.
void construct_versioned_tracker(const std::vector<RowsetMetaSharedPtr>& rs_metas,
const std::vector<RowsetMetaSharedPtr>& stale_metas);

/// Recover rowsets version tracker from stale version path map. When delete operation fails, the
/// tracker can be recovered from deleted stale_version_path_map.
void recover_versioned_tracker(const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map);
Expand Down Expand Up @@ -180,9 +184,19 @@ class TimestampedVersionTracker {
void get_stale_version_path_json_doc(rapidjson::Document& path_arr);

private:
/// Construct rowsets version tracker with stale rowsets.
/// Construct rowsets version tracker with main path rowset meta.
void _construct_versioned_tracker(const std::vector<RowsetMetaSharedPtr>& rs_metas);

/// init stale_version_path_map by main path rowset meta and stale rowset meta.
void _init_stale_version_path_map(const std::vector<RowsetMetaSharedPtr>& rs_metas,
const std::vector<RowsetMetaSharedPtr>& stale_metas);

/// find a path in stale_map from first_version to second_version, stale_path is used as result.
bool _find_path_from_stale_map(
const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>& stale_map,
int64_t first_version, int64_t second_version,
std::vector<RowsetMetaSharedPtr>* stale_path);

private:
// This variable records the id of path version which will be dispatched to next path version,
// it is not persisted.
Expand Down
17 changes: 17 additions & 0 deletions be/test/olap/timestamped_version_tracker_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,23 @@ TEST_F(TestTimestampedVersionTracker, construct_versioned_tracker) {
ASSERT_EQ(1, tracker._next_path_id);
}

TEST_F(TestTimestampedVersionTracker, construct_version_tracker_by_stale_meta) {

std::vector<RowsetMetaSharedPtr> rs_metas;
std::vector<RowsetMetaSharedPtr> expried_rs_metas;
std::vector<Version> version_path;

init_all_rs_meta(&rs_metas);
init_expried_row_rs_meta(&expried_rs_metas);

TimestampedVersionTracker tracker;
tracker.construct_versioned_tracker(rs_metas, expried_rs_metas);

ASSERT_EQ(10, tracker._version_graph._version_graph.size());
ASSERT_EQ(4, tracker._stale_version_path_map.size());
ASSERT_EQ(5, tracker._next_path_id);
}

TEST_F(TestTimestampedVersionTracker, construct_versioned_tracker_with_same_rowset) {

std::vector<RowsetMetaSharedPtr> rs_metas;
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/olap_file.proto
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ message TabletMetaPB {
optional int64 end_rowset_id = 15;
optional RowsetTypePB preferred_rowset_type = 16;
optional TabletTypePB tablet_type = 17;
repeated RowsetMetaPB stale_rs_metas = 18;
}

message OLAPIndexHeaderMessage {
Expand Down